LingxinMeng 1 jaar geleden
bovenliggende
commit
e9b2818358

+ 7 - 1
amd64/dispatch_server/package/infra/application.yaml

@@ -16,12 +16,14 @@ redis:
   db: 3
 
 kafka:
+  partition: 0
   brokers:
     - 10.14.85.239:9092
 
 oss:
+  type: Aliyun # Minio 或 Aliyun
   is-user-cname: true
-  endpoint: http://open-bucket.oss.icvdc.com
+  endpoint: open-bucket.oss.icvdc.com
   #  is-user-cname: false
   #  endpoint: oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com
   access-key-id: n8glvFGS25MrLY7j
@@ -41,3 +43,7 @@ k8s:
   vtd-pod-template-yaml: /mnt/disk001/cicv-data-closedloop/pod-template/vtd-pod-template.yaml
   algorithm-tar-temp-dir: /mnt/disk001/cicv-data-closedloop/temp/algorithm/
   registry-uri: 10.14.85.237:5000
+  namespace-name: cicvdcl
+  vtd-image: 10.14.85.237:5000/vtd.run.perception.release:latest
+  vtd-command: /Controller/config/docker_cloud_algContest.ini
+

+ 6 - 1
amd64/dispatch_server/package/infra/i_application.go

@@ -37,10 +37,12 @@ type RedisStruct struct {
 }
 
 type KafkaStruct struct {
-	Brokers []string `yaml:"brokers"`
+	Partition int32    `yaml:"partition"`
+	Brokers   []string `yaml:"brokers"`
 }
 
 type OssStruct struct {
+	Type            string `yaml:"type"`
 	IsUseCname      bool   `yaml:"is-use-cname"`
 	Endpoint        string `yaml:"endpoint"`
 	AccessKeyId     string `yaml:"access-key-id"`
@@ -59,6 +61,9 @@ type K8sStruct struct {
 	VtdPodTemplateYaml  string `yaml:"vtd-pod-template-yaml"`
 	AlgorithmTarTempDir string `yaml:"algorithm-tar-temp-dir"`
 	RegistryUri         string `yaml:"registry-uri"`
+	NamespaceName       string `yaml:"namespace-name"`
+	VtdImage            string `yaml:"vtd-image"`
+	VtdCommand          string `yaml:"vtd-command"`
 }
 
 var (

+ 39 - 14
amd64/dispatch_server/package/service/run_task.go

@@ -129,12 +129,10 @@ func RunWaitingCluster() {
 			continue
 		}
 		topic := projectId
-		value := []byte(taskJson)
-
 		// 创建一个Message,并指定分区为0
 		msg := &kafka.Message{
-			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0, Offset: kafka.Offset(offset)},
-			Value:          value,
+			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: infra.ApplicationYaml.Kafka.Partition, Offset: kafka.Offset(offset)},
+			Value:          []byte(taskJson),
 		}
 		// 发送消息,并处理结果
 		err = infra.GlobalKafkaProducer.Produce(msg, nil)
@@ -166,9 +164,13 @@ func RunWaitingCluster() {
 			c_log.GlobalLogger.Errorf("删除算法镜像文件 %v 失败,错误信息为:%v", algorithmTarPath, err)
 		}
 		// --------------- 启动 k8s pod ---------------
-		nodeName := gpuNode.Hostname
-		// 1 生成 podName
 		podName := "project-" + projectId + "-" + util.NewShortUUID()
+		namespaceName := infra.ApplicationYaml.K8s.NamespaceName
+		nodeName := gpuNode.Hostname
+		restParallelism := gpuNode.Parallelism
+		vtdContainer := "vtd-" + projectId
+		algorithmContainer := "algorithm-" + projectId
+		vtdImage := infra.ApplicationYaml.K8s.VtdImage
 		// 2 生成模板文件名称
 		podYaml := nodeName + "#" + podName + ".yaml"
 		// 3 模板yaml存储路径
@@ -182,15 +184,38 @@ func RunWaitingCluster() {
 			infra.GlobalLogger.Error(err)
 			continue
 		}
-		podString = strings.Replace(podString, "vtd-container-name", "vtd-"+projectId+"-"+nodeName, 1)
-		podString = strings.Replace(podString, "cicv-data-closedloop-ip", infra.ApplicationYaml.Web.IpPrivate, 1)
-		podString = strings.Replace(podString, "kafka-ip", infra.ApplicationYaml.Kafka.Brokers[0], 1)
-		podString = strings.Replace(podString, "kafka-topic", projectId, 1)
-		// 发送消息之后会拿到消息的分区和偏移量
-		podString = strings.Replace(podString, "kafka-partition", projectId, 1)
-		podString = strings.Replace(podString, "kafka-offset", projectId, 1)
-		// todo cpu编号是剩余并行度减一
+		podString = strings.Replace(podString, "pod-name", podName, -1)
+		podString = strings.Replace(podString, "namespace-name", namespaceName, -1)
+		podString = strings.Replace(podString, "node-name", nodeName, -1)
+		podString = strings.Replace(podString, "algorithm-image", algorithmImageName, -1)
+		podString = strings.Replace(podString, "vtd-container", vtdContainer, -1)
+		podString = strings.Replace(podString, "vtd-image", vtdImage, -1)
+		podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommand, -1)
+		podString = strings.Replace(podString, "platform-ip", infra.ApplicationYaml.Web.IpPrivate, -1)
+		podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.Oss.Type, -1)
+		podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.Oss.Endpoint, -1) // 不带http://前缀
+		podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.Oss.AccessKeyId, -1)
+		podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.Oss.AccessKeySecret, -1)
+		podString = strings.Replace(podString, "kafka-ip", infra.ApplicationYaml.Kafka.Brokers[0], -1)
+		podString = strings.Replace(podString, "kafka-topic", projectId, -1)
+		podString = strings.Replace(podString, "kafka-partition", "\""+util.ToString(infra.ApplicationYaml.Kafka.Partition)+"\"", -1)
+		podString = strings.Replace(podString, "kafka-offset", "\""+util.ToString(offset)+"\"", -1)
+		podString = strings.Replace(podString, "cpu-order", "\""+util.ToString(restParallelism-1)+"\"", -1) // cpu编号是剩余并行度-1
+		podString = strings.Replace(podString, "algorithm-container", algorithmContainer, -1)
 
+		// --------------- 保存成文件
+		err = util.WriteFile(podString, yamlPath)
+		err = util.WriteFile(podString, yamlPathBak)
+		if err != nil {
+			infra.GlobalLogger.Error("保存yaml字符串失败,错误信息为", err)
+			continue
+		}
+		// --------------- 启动 pod
+		_, s2, err := util.Execute("kubectl", "apply", "-f", yamlPath)
+		if err != nil {
+			infra.GlobalLogger.Errorf("保存yaml字符串失败,执行结果为 %v,错误信息为 %v", s2, err)
+			continue
+		}
 		// --------------- 移除头元素
 		_, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
 		if err != nil {

+ 38 - 0
amd64/dispatch_server/package/util/u_file.go

@@ -3,6 +3,7 @@ package util
 import (
 	"io"
 	"os"
+	"path/filepath"
 )
 
 func ReadFile(filePath string) (string, error) {
@@ -32,3 +33,40 @@ func RemoveFile(path string) error {
 	}
 	return nil
 }
+
+func WriteFile(sourceContent string, targetFilePath string) error {
+	if err := CreateFile(targetFilePath); err != nil {
+		return err
+	}
+	if err := os.WriteFile(targetFilePath, []byte(sourceContent), 0644); err != nil {
+		return err
+	}
+	return nil
+}
+
+// CreateFile 存在则覆盖,不存在则创建文件
+func CreateFile(filePath string) error {
+	if err := CreateParentDir(filePath); err != nil {
+		return err
+	}
+	// 创建文件,如果文件已存在则覆盖
+	file, err := os.Create(filePath)
+	if err != nil {
+		return err
+	}
+	defer func(file *os.File) {
+		err := file.Close()
+		if err != nil {
+
+		}
+	}(file)
+	return nil
+}
+
+// CreateParentDir 存在不创建,不存在则创建父目录
+func CreateParentDir(filePath string) error {
+	if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil {
+		return err
+	}
+	return nil
+}

+ 4 - 4
amd64/dispatch_server/vtd-pod-template_20240506.yaml

@@ -39,10 +39,8 @@ spec:
               fieldPath: metadata.name
         - name: LM_LICENSE_FILE
           value: 27500@10.14.85.247
-        - name: SIMULATION_CLOUD_IP
-          value: simulation-cloud-ip
-        - name: KAFKA_IP
-          value: kafka-ip
+        - name: PLATFORM_IP
+          value: platform-ip
         - name: OSS_TYPE
           value: oss-type
         - name: OSS_IP
@@ -53,6 +51,8 @@ spec:
           value: oss-secret-key
         - name: OSS_BUCKET_NAME
           value: oss-bucket
+        - name: KAFKA_IP
+          value: kafka-ip
         - name: KAFKA_PARTITION
           value: kafka-partition
         - name: KAFKA_OFFSET