martin hace 3 años
padre
commit
00e117bff0

+ 21 - 225
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -222,27 +222,29 @@ public class ManualProjectConsumer {
         }
 
         // -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
-        // 私有仓库导入算法镜像
-        String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
-        //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
+//        // 私有仓库导入算法镜像
+//        String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
+//        //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
 //        AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
 //        if (algorithmPO == null){
 //            // 访问索为远程接口
+//        } else{
+//            String minioPath = algorithmPO.getMinioPath();
+//            String dockerImage;
+//            if ("0".equals(algorithmPO.getDockerImport())) {
+//                dockerImage = "algorithm_" + algorithmId + ":latest";
+//                String algorithmTarLinuxTempPath = linuxTempPath + minioPath;
+//                // 下载算法文件到本地( 2 到仓库服务器)
+//                MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);
+//                //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
+//                LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
+//            } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
+//                dockerImage = algorithmPO.getDockerImage();
+//            } else {
+//                throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
+//            }
 //        }
-//        String minioPath = algorithmPO.getMinioPath();
-//        String dockerImage;
-//        if ("0".equals(algorithmPO.getDockerImport())) {
-//            dockerImage = "algorithm_" + algorithmId + ":latest";
-//            String algorithmTarLinuxTempPath = linuxTempPath + minioPath;
-//            // 下载算法文件到本地( 2 到仓库服务器)
-//            MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);
-//            //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
-//            LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
-//        } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
-//            dockerImage = algorithmPO.getDockerImage();
-//        } else {
-//            throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
-//        }
+
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
         int completions = sceneList.size();     // 结束标
         int parallelism = projectMessageDTO.getParallelism();    // 并行度
@@ -263,218 +265,12 @@ public class ManualProjectConsumer {
         stringBuilder.replace(j + "parallelism: ".length(), j + "parallelism: ".length() + 1, parallelism + "");
         String yamlTarget0 = stringBuilder.toString();
         String yamlTarget1 = yamlTarget0.replace("apiVers1on", "apiVersion");
+        String yamlTarget2 = yamlTarget1.replace("1atch/v1", "batch/v1");
         log.info("------- ManualProjectConsumer 开始执行 yaml 文件" + yamlTarget1);
-        FileUtil.writeStringToLocalFile(yamlTarget1, jobTemplateYamlPathTarget);
+        FileUtil.writeStringToLocalFile(yamlTarget2, jobTemplateYamlPathTarget);
         LinuxUtil.execute("kubectl apply -f " + jobTemplateYamlPathTarget);
     }
 
 
-    @SneakyThrows
-//    public void parseProject(ConsumerRecord<String, String> projectRecord) {
-    public void parseProjectService(String projectJson) {
-        log.info("------- parseProject 接收到消息为:" + projectJson);
-//        log.info("------- parseProject 接收到消息为:" + projectRecord);
-        //1 读取 kafka 的 project 信息
-        /*
-            {
-                "projectId": "sadfasdfs",	// 项目 id
-                "algorithmId": "sadfasdfs",	// 算法 id
-                "vehicleConfigId": "sadfasdfs",	// 车辆 id
-                "scenePackageId": "sadfasdfs",	// 场景包 id
-                "maxSimulationTime": 11111,	// 最大仿真时间
-                "parallelism": 30		// 并行度
-            }
-         */
-//        String projectJson = projectRecord.value();
-        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
-        String projectId = projectMessageDTO.getProjectId();    // 项目 id
-        projectMapper.updateProjectState(projectId, DictConstants.PROJECT_RUNNING);   // 修改该 project 的状态为执行中
-
-
-        // -------------------------------- 1 场景 --------------------------------
-        // 根据 packageId,拿到场景 id,然后获取每个场景的文件地址。
-        String packageId = projectMessageDTO.getScenePackageId();
-        List<IndexTemplatePO> leafIndexList = indexTemplateMapper.selectLeafIndexByPackageId(packageId);
-        List<String> naturalIdList = new ArrayList<>();
-        List<String> standardIdList = new ArrayList<>();
-        List<String> accidentIdList = new ArrayList<>();
-        for (IndexTemplatePO indexTemplatePO : leafIndexList) {
-            String naturalIds = indexTemplatePO.getSceneNaturalIds();
-            String standardIds = indexTemplatePO.getSceneStatueIds();
-            String accidentIds = indexTemplatePO.getSceneTrafficIds();
-            if (StringUtil.isNotEmpty(naturalIds)) {
-                String[] naturalIdArray = naturalIds.split(",");
-                naturalIdList.addAll(Arrays.asList(naturalIdArray));
-            }
-            if (StringUtil.isNotEmpty(standardIds)) {
-                String[] standardArray = standardIds.split(",");
-                standardIdList.addAll(Arrays.asList(standardArray));
-            }
-            if (StringUtil.isNotEmpty(accidentIds)) {
-                String[] accidentIdArray = accidentIds.split(",");
-                accidentIdList.addAll(Arrays.asList(accidentIdArray));
-            }
-        }
-        List<ScenePO> sceneList = new ArrayList<>();
-        if (CollectionUtil.isNotEmpty(naturalIdList)) {
-            sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
-        }
-        if (CollectionUtil.isNotEmpty(standardIdList)) {
-            sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
-        }
-        if (CollectionUtil.isNotEmpty(accidentIdList)) {
-            sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
-        }
-        projectMapper.updateTaskNumber(projectId, sceneList.size()); // 有多少场景就有多少任务
-        // -------------------------------- 2 模型 --------------------------------
-        // 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
-        String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
-        VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 模型
-        List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
-        List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
-
-        // -------------------------------- 3 任务消息 --------------------------------
-        // 根据场景创建任务,组装 task 消息
-        int maxSimulationTime = projectMessageDTO.getMaxSimulationTime();
-
-        for (ScenePO scenePO : sceneList) {
-            String taskId = StringUtil.getRandomUUID();
-            String resultPath = linuxTempPath + projectId + "/" + taskId;
-            // 保存任务信息
-            TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
-                    .id(taskId)
-                    .pId(projectId)
-                    .sceneId(scenePO.getId())
-                    .sceneName(scenePO.getName())
-                    .sceneType(scenePO.getType())
-                    .runState(DictConstants.TASK_PENDING)
-                    .runResult(resultPath)
-                    .build();
-            taskPO.setCreateTime(TimeUtil.getNowForMysql());
-            taskPO.setCreateUserId(USER_ID);
-            taskPO.setModifyTime(TimeUtil.getNowForMysql());
-            taskPO.setModifyUserId(USER_ID);
-            taskPO.setModifyTime(TimeUtil.getNowForMysql());
-            taskPO.setIsDeleted("0");
-            taskMapper.insert(taskPO);
-            // 心跳信息存在緩存中
-            redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId, TimeUtil.getNowString());
-            // 组装 task 消息
-            TaskTO taskTO = TaskTO.builder()
-                    .info(InfoTO.builder()
-                            .project_id(projectId)
-                            .task_id(taskId)
-                            .task_path(resultPath)
-                            .default_time(maxSimulationTime)
-                            .build())
-                    .scenario(ScenarioTO.builder()
-                            .scenario_osc(scenePO.getScenarioOsc())
-                            .scenario_odr(scenePO.getScenarioOdr())
-                            .scenario_osgb(scenePO.getScenarioOsgb())
-                            .build())
-                    .vehicle(VehicleTO.builder()
-                            .model(ModelTO.builder()
-                                    .model_label(vehiclePO.getModelLabel())
-                                    .build())
-                            .dynamics(DynamicsTO.builder()
-                                    .dynamics_maxspeed(vehiclePO.getMaxSpeed())
-                                    .dynamics_enginepower(vehiclePO.getEnginePower())
-                                    .dynamics_maxdecel(vehiclePO.getMaxDeceleration())
-                                    .dynamics_maxsteering(vehiclePO.getMaxSteeringAngle())
-                                    .dynamics_mass(vehiclePO.getMass())
-                                    .dynamics_frontsurfaceeffective(vehiclePO.getFrontSurfaceEffective())
-                                    .dynamics_airdragcoefficient(vehiclePO.getAirDragCoefficient())
-                                    .dynamics_rollingresistance(vehiclePO.getRollingResistanceCoefficient())
-                                    .dynamics_wheeldiameter(vehiclePO.getWheelDiameter())
-                                    .dynamics_wheeldrive(vehiclePO.getWheelDrive())
-                                    .dynamics_overallefficiency(vehiclePO.getOverallEfficiency())
-                                    .dynamics_distfront(vehiclePO.getFrontDistance())
-                                    .dynamics_distrear(vehiclePO.getRearDistance())
-                                    .dynamics_distleft(vehiclePO.getLeftDistance())
-                                    .dynamics_distright(vehiclePO.getRightDistance())
-                                    .dynamics_distheight(vehiclePO.getHeightDistance())
-                                    .dynamics_wheelbase(vehiclePO.getWheelbase())
-                                    .build())
-                            .sensors(SensorsTO.builder()   // 根据 vehicleId 查询绑定的传感器列表
-                                    .camera(cameraPOList)
-                                    .OGT(ogtPOList)
-                                    .build())
-                            .build())
-                    .build();
-            //4-4 将对象转成 json
-            String taskJson = JsonUtil.beanToJson(taskTO);
-            //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
-            kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
-//            kafkaTemplate.send("test", taskJson).addCallback(success -> {
-                // 消息发送到的topic
-                String topic = success.getRecordMetadata().topic();
-                // 消息发送到的分区
-                int partition = success.getRecordMetadata().partition();
-                // 消息在分区内的offset
-                long offset = success.getRecordMetadata().offset();
-                log.info("------- 发送消息成功:\n"
-                        + "主题 topic 为:" + topic + "\n"
-                        + "分区 partition 为:" + partition + "\n"
-                        + "偏移量为:" + offset + "\n"
-                        + "消息体为:" + taskJson);
-            }, failure -> {
-                log.error("------- 发送消息失败:" + failure.getMessage());
-            });
-        }
-
-        // -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
-//        // 私有仓库导入算法镜像(搭建私有仓库)
-//        String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
-//        //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
-//        AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
-//        String minioPath = algorithmPO.getMinioPath();
-//        String dockerImage;
-//        if ("0".equals(algorithmPO.getDockerImport())) {
-//            dockerImage = "algorithm_" + algorithmId + ":latest";
-//            String algorithmTarLinuxTempPath = linuxTempPath + minioPath;
-//            // 下载算法文件到本地( 2 到仓库服务器)
-//            MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);
-//            //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
-//            LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
-//        } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
-//            dockerImage = algorithmPO.getDockerImage();
-//        } else {
-//            throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
-//        }
-        // -------------------------------- 5 创建 pod 开始执行 --------------------------------
-//        int completions = sceneList.size();     // 结束标
-//        int parallelism = projectMessageDTO.getParallelism();    // 并行度
-//        BatchV1Api batchV1Api = new BatchV1Api(apiClient);
-////        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
-//        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-test.yaml"));
-//        //1 apiVersion
-//        //2 kind
-//        //3 metadata
-//        V1ObjectMeta metadata = yaml.getMetadata();
-//        metadata.setName("project_" + projectId);
-//        yaml.setMetadata(metadata);
-//        //4 job
-//        V1JobSpec job = yaml.getSpec();
-//        job.setCompletions(completions); // 这个标准是什么?
-//        job.setParallelism(parallelism);
-//        //5 pod
-//        V1PodSpec v1PodSpec = job.getTemplate().getSpec();
-//        //6 container
-//        List<V1Container> containers = v1PodSpec.getContainers();
-//        for (V1Container container : containers) {
-//            String name = container.getName();
-//            if ("vtd".equals(name)) {
-//                container.setName("vtd_" + projectId);
-//            }
-//            if ("algorithm".equals(name)) {
-//                container.setName("algorithm_" + projectId);
-////                container.setImage(dockerImage);
-//            }
-//        }
-//        //4-4 创建
-//        yaml.setSpec(job);
-//        batchV1Api.createNamespacedJob("simulation-cloud", yaml, null, null, null);
-    }
-
 
 }

+ 0 - 6
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/KafkaController.java

@@ -23,12 +23,6 @@ public class KafkaController {
     ManualProjectConsumer manualProjectConsumer;
 
 
-    @PostMapping("/project")
-    @SneakyThrows
-    public void project(@RequestBody ProjectMessageDTO projectMessageDTO) {
-        manualProjectConsumer.parseProjectService(JsonUtil.beanToJson(projectMessageDTO));
-    }
-
     @PostMapping("/hello")
     public void hello() {
         kafkaTemplate.send("hello", "hello world!").addCallback(success -> {

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -126,7 +126,7 @@ public class TaskService {
                         taskMapper.updateState(task2Id, DictConstants.TASK_ANALYSING);
                         // 计算每个任务的得分
                         ScoreTO score;
-                        String runResultMinio = task2.getRunResult();
+                        String runResultMinio = task2.getRunResult() + "/Ego.csv";
                         String runResultLinux = linuxTempPath + runResultMinio;
 //                        python3 /home/ubuntu/test/Evaluate/main.py /home/ubuntu/test/test_data.csv 4 AEB_1-2
 //                        String command = "python3 " + pyPath + " " + runResultLinux + " " + task2.getSceneType();  // 默认使用场景名称找打分脚本

+ 6 - 5
simulation-resource-scheduler/src/main/resources/kubernetes/template/job/job-template.yaml

@@ -1,4 +1,3 @@
----
 apiVersion: batch/v1
 kind: Job
 metadata:
@@ -7,12 +6,14 @@ metadata:
   labels:
     user: EY
 spec:
-  completions: 1
-  parallelism: 1
   template:
     metadata:
       name: pod-cloud-simulation
     spec:
+      hostAliases:
+        - ip: 172.17.0.184
+          hostnames:
+            - cicvtest002
       containers:
         - name: vtd-container
           image: vtd.run.perception:latest
@@ -31,7 +32,7 @@ spec:
         - name: algorithm-container
           image: aeb.ros:latest
           imagePullPolicy: Never
-          command: [ "/AEB/start_docker.sh" ]
+          command: [ "/bin/sh", "-c", "/AEB/start_docker.sh; touch /tmp/hello.txt;while true;do /bin/echo $(date +%T) >> /tmp/hello.txt; sleep 5; done;" ]
       restartPolicy: Never
       volumes:
         - name: nvidia0
@@ -39,4 +40,4 @@ spec:
             path: /dev/nvidia0
         - name: nvidiactl
           hostPath:
-            path: /dev/nvidiactl
+            path: /dev/nvidiactl