martin 3 жил өмнө
parent
commit
bee60e08b8

+ 30 - 20
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -11,6 +11,9 @@ import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.pojo.to.*;
 import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.apis.BatchV1Api;
+import io.kubernetes.client.openapi.models.V1Job;
+import io.kubernetes.client.util.Yaml;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -22,6 +25,7 @@ import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -59,6 +63,10 @@ public class ManualProjectConsumer {
     ApiClient apiClient;
     @Value("${scheduler.manual-project.topic}")
     String manualProjectTopic;
+    @Value("${scheduler.manual-project.result-path-minio}")
+    String resultPathMinio;
+    @Value("${scheduler.manual-project.job-template-yaml}")
+    String jobTemplateYaml;
     @Value("${scheduler.linux-temp-path}")
     String linuxTempPath;
 
@@ -71,7 +79,7 @@ public class ManualProjectConsumer {
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.topic}")
     @SneakyThrows
     public void parseProject(ConsumerRecord<String, String> projectRecord) {
-        log.info("------- parseProject 接收到消息为:" + projectRecord);
+        log.info("------- ManualProjectConsumer 接收到消息为:" + projectRecord);
         //1 读取 kafka 的 project 信息
         /*
             {
@@ -114,13 +122,13 @@ public class ManualProjectConsumer {
             }
         }
         List<ScenePO> sceneList = new ArrayList<>();
-        if (CollectionUtil.isNotEmpty(naturalIdList)){
+        if (CollectionUtil.isNotEmpty(naturalIdList)) {
             sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
         }
-        if (CollectionUtil.isNotEmpty(standardIdList)){
+        if (CollectionUtil.isNotEmpty(standardIdList)) {
             sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
         }
-        if (CollectionUtil.isNotEmpty(accidentIdList)){
+        if (CollectionUtil.isNotEmpty(accidentIdList)) {
             sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
         }
         projectMapper.updateTaskNumber(projectId, sceneList.size()); // 有多少场景就有多少任务
@@ -137,7 +145,7 @@ public class ManualProjectConsumer {
 
         for (ScenePO scenePO : sceneList) {
             String taskId = StringUtil.getRandomUUID();
-            String resultPath = linuxTempPath + projectId + "/" + taskId;
+            String resultPath = resultPathMinio + projectId + "/" + taskId;
             // 保存任务信息
             TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
                     .id(taskId)
@@ -202,15 +210,14 @@ public class ManualProjectConsumer {
             //4-4 将对象转成 json
             String taskJson = JsonUtil.beanToJson(taskTO);
             //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
-//            kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
-            kafkaTemplate.send("test", taskJson).addCallback(success -> {
+            kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
                 // 消息发送到的topic
                 String topic = success.getRecordMetadata().topic();
                 // 消息发送到的分区
                 int partition = success.getRecordMetadata().partition();
                 // 消息在分区内的offset
                 long offset = success.getRecordMetadata().offset();
-                log.info("------- 发送消息成功:\n"
+                log.info("------- ManualProjectConsumer 发送消息成功:\n"
                         + "主题 topic 为:" + topic + "\n"
                         + "分区 partition 为:" + partition + "\n"
                         + "偏移量为:" + offset + "\n"
@@ -221,10 +228,10 @@ public class ManualProjectConsumer {
         }
 
         // -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
-        // 私有仓库导入算法镜像
-        String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
-        //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
-        AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
+//        // 私有仓库导入算法镜像
+//        String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
+//        //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
+//        AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
 //        String minioPath = algorithmPO.getMinioPath();
 //        String dockerImage;
 //        if ("0".equals(algorithmPO.getDockerImport())) {
@@ -240,10 +247,12 @@ public class ManualProjectConsumer {
 //            throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
 //        }
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
-//        int completions = sceneList.size();     // 结束标
-//        int parallelism = projectMessageDTO.getParallelism();    // 并行度
-//        BatchV1Api batchV1Api = new BatchV1Api(apiClient);
-////        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
+        int completions = sceneList.size();     // 结束标
+        int parallelism = projectMessageDTO.getParallelism();    // 并行度
+        log.info("------- ManualProjectConsumer 项目 " + projectId + " 的完成度为:" + completions);
+        log.info("------- ManualProjectConsumer 项目 " + projectId + " 的并行度为:" + parallelism);
+        BatchV1Api batchV1Api = new BatchV1Api(apiClient);
+        V1Job yaml = (V1Job) Yaml.load(new File("/opt/simulation-cloud/simulation-resource-scheduler/conf/job-template.yaml"));
 //        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-test.yaml"));
 //        //1 apiVersion
 //        //2 kind
@@ -271,7 +280,8 @@ public class ManualProjectConsumer {
 //        }
 //        //4-4 创建
 //        yaml.setSpec(job);
-//        batchV1Api.createNamespacedJob("simulation-cloud", yaml, null, null, null);
+        log.info("------- ManualProjectConsumer 创建 job:" + yaml);
+        batchV1Api.createNamespacedJob("simulation-cloud", yaml, null, null, null);
     }
 
 
@@ -322,13 +332,13 @@ public class ManualProjectConsumer {
             }
         }
         List<ScenePO> sceneList = new ArrayList<>();
-        if (CollectionUtil.isNotEmpty(naturalIdList)){
+        if (CollectionUtil.isNotEmpty(naturalIdList)) {
             sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
         }
-        if (CollectionUtil.isNotEmpty(standardIdList)){
+        if (CollectionUtil.isNotEmpty(standardIdList)) {
             sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
         }
-        if (CollectionUtil.isNotEmpty(accidentIdList)){
+        if (CollectionUtil.isNotEmpty(accidentIdList)) {
             sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
         }
         projectMapper.updateTaskNumber(projectId, sceneList.size()); // 有多少场景就有多少任务

+ 29 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/MinioController.java

@@ -0,0 +1,29 @@
+package com.css.simulation.resource.scheduler.controller;
+
+import com.css.simulation.resource.scheduler.util.MinioUtil;
+import io.minio.MinioClient;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestPart;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.multipart.MultipartFile;
+
+@RequestMapping("/minio")
+@RestController
+@Slf4j
+public class MinioController {
+
+    @Autowired
+    MinioClient minioClient;
+
+
+    @PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
+    @SneakyThrows
+    public void upload(@RequestPart("file") MultipartFile file, String path) {
+        MinioUtil.uploadFromMultipartFile(minioClient, file, "test", path);
+    }
+}

+ 2 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -50,6 +50,8 @@ public class TaskService {
     IndexTemplateMapper indexTemplateMapper;
     @Value("${scheduler.manual-project.topic}")
     String manualProjectTopic;
+    @Value("${scheduler.manual-project.result-path-minio}")
+    String resultPathMinio;
     @Value("${scheduler.score.hostname}")
     String hostname;
     @Value("${scheduler.score.username}")

+ 6 - 27
simulation-resource-scheduler/src/main/resources/kubernetes/template/job/job-template.yaml

@@ -2,31 +2,23 @@
 apiVersion: batch/v1
 kind: Job
 metadata:
-  name: job-vtd-perception-1
+  name: job-cloud-simulation
   namespace: default
   labels:
     user: EY
 spec:
   template:
     metadata:
-      name: pod-vtd-perception
+      name: pod-cloud-simulation
     spec:
       containers:
         - name: vtd
           image: vtd.run.perception:latest
           imagePullPolicy: Never
-          command: [ "/Controller/VTDController", "/Controller/config/local_docker.ini" ]
+          command: [ "/Controller/VTDController", "/Controller/config/docker_cloud.ini" ]
           env:
-            - name: NodeID
-              valueFrom:
-                fieldRef:
-                  fieldPath: metadata.name
             - name: LM_LICENSE_FILE
-              value: 27500@192.168.10.179
-          ports:
-            - name: ROS-Master-IP
-              containerPort: 11311
-              protocol: TCP
+              value: 27500@172.14.1.103
           volumeMounts:
             - name: nvidia0
               mountPath: /dev/nvidia0
@@ -35,22 +27,9 @@ spec:
           securityContext:
             privileged: true
         - name: algorithm
-          image: sharing_van_algorithm:latest
+          image: aeb.ros:latest
           imagePullPolicy: Never
-          command: [ "/simulation_workspace/start.sh" ]
-          env:
-            - name: VtdIP
-              valueFrom:
-                fieldRef:
-                  fieldPath: status.podIP
-            - name: VehicleID
-              value: 1
-            - name: SensorPort
-              value: 50000
-          ports:
-            - name: ROS-Master-IP
-              containerPort: 11311
-              protocol: TCP
+          command: [ "/AEB/start_docker.sh" ]
       restartPolicy: Never
       volumes:
         - name: nvidia0

+ 0 - 39
simulation-resource-scheduler/src/main/resources/kubernetes/template/job/job-test.yaml

@@ -1,39 +0,0 @@
-apiVersion: v1
-kind: Pod
-metadata:
-  name: ros-algorithm-test
-spec:
-  restartPolicy: Never
-  containers:
-    - name: vtd
-      image: vtd.run.perception:latest
-      imagePullPolicy: Never
-#      command: ["/Controller/VTDController", "/Controller/config/local_docker.ini"]
-      command: [ "/bin/sh","-c","touch /tmp/hello.txt;while true;do /bin/echo $(date +%T) >> /tmp/hello.txt; sleep 3000; done;" ]
-      env:
-        - name: NodeID
-          valueFrom:
-            fieldRef:
-              fieldPath: metadata.name
-        - name: LM_LICENSE_FILE
-          value: 27500@192.168.10.179
-      volumeMounts:
-        - name: nvidia0
-          mountPath: /dev/nvidia0
-        - name: nvidiactl
-          mountPath: /dev/nvidiactl
-      securityContext:
-        privileged: true
-    - name: algorithm
-      image: aeb.ros:latest
-      imagePullPolicy: Never
-#        command: [ "/AEB/start.sh" ]
-      command: [ "/bin/sh","-c","touch /tmp/hello.txt;while true;do /bin/echo $(date +%T) >> /tmp/hello.txt; sleep 3000; done;" ]
-  volumes:
-    - name: nvidia0
-      hostPath:
-        path: /dev/nvidia0
-    - name: nvidiactl
-      hostPath:
-        path: /dev/nvidiactl
-