martin 3 jaren geleden
bovenliggende
commit
97725da036

+ 2 - 1
simulation-resource-common/src/main/java/com/css/simulation/resource/common/controller/KafkaController.java

@@ -31,7 +31,8 @@ public class KafkaController {
             log.info("------- 发送消息成功:\n"
                     + "主题 topic 为:" + topic + "\n"
                     + "分区 partition 为:" + partition + "\n"
-                    + "偏移量为:" + offset);
+                    + "偏移量为:" + offset + "\n"
+                    + "消息体为:" + kafkaParameter.getData());
         }, failure -> {
             log.error("发送消息失败:" + failure.getMessage());
         });

+ 204 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -206,7 +206,210 @@ public class ManualProjectConsumer {
                 log.info("------- 发送消息成功:\n"
                         + "主题 topic 为:" + topic + "\n"
                         + "分区 partition 为:" + partition + "\n"
-                        + "偏移量为:" + offset);
+                        + "偏移量为:" + offset + "\n"
+                        + "消息体为:" + taskJson);
+            }, failure -> {
+                log.error("------- 发送消息失败:" + failure.getMessage());
+            });
+        }
+
+        // -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
+//        // 私有仓库导入算法镜像(搭建私有仓库)
+//        String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
+//        //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
+//        AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
+//        String minioPath = algorithmPO.getMinioPath();
+//        String dockerImage;
+//        if ("0".equals(algorithmPO.getDockerImport())) {
+//            dockerImage = "algorithm_" + algorithmId + ":latest";
+//            String algorithmTarLinuxTempPath = linuxTempPath + minioPath;
+//            // 下载算法文件到本地( 2 到仓库服务器)
+//            MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);
+//            //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
+//            LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
+//        } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
+//            dockerImage = algorithmPO.getDockerImage();
+//        } else {
+//            throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
+//        }
+        // -------------------------------- 5 创建 pod 开始执行 --------------------------------
+//        int completions = sceneList.size();     // 结束标
+//        int parallelism = projectMessageDTO.getParallelism();    // 并行度
+//        BatchV1Api batchV1Api = new BatchV1Api(apiClient);
+////        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
+//        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-test.yaml"));
+//        //1 apiVersion
+//        //2 kind
+//        //3 metadata
+//        V1ObjectMeta metadata = yaml.getMetadata();
+//        metadata.setName("project_" + projectId);
+//        yaml.setMetadata(metadata);
+//        //4 job
+//        V1JobSpec job = yaml.getSpec();
+//        job.setCompletions(completions); // 这个标准是什么?
+//        job.setParallelism(parallelism);
+//        //5 pod
+//        V1PodSpec v1PodSpec = job.getTemplate().getSpec();
+//        //6 container
+//        List<V1Container> containers = v1PodSpec.getContainers();
+//        for (V1Container container : containers) {
+//            String name = container.getName();
+//            if ("vtd".equals(name)) {
+//                container.setName("vtd_" + projectId);
+//            }
+//            if ("algorithm".equals(name)) {
+//                container.setName("algorithm_" + projectId);
+////                container.setImage(dockerImage);
+//            }
+//        }
+//        //4-4 创建
+//        yaml.setSpec(job);
+//        batchV1Api.createNamespacedJob("simulation-cloud", yaml, null, null, null);
+    }
+
+
+    @SneakyThrows
+//    public void parseProject(ConsumerRecord<String, String> projectRecord) {
+    public void parseProjectService(String projectJson) {
+        log.info("------- parseProject 接收到消息为:" + projectJson);
+//        log.info("------- parseProject 接收到消息为:" + projectRecord);
+        //1 读取 kafka 的 project 信息
+        /*
+            {
+                "projectId": "sadfasdfs",	// 项目 id
+                "algorithmId": "sadfasdfs",	// 算法 id
+                "vehicleConfigId": "sadfasdfs",	// 车辆 id
+                "scenePackageId": "sadfasdfs",	// 场景包 id
+                "maxSimulationTime": 11111,	// 最大仿真时间
+                "parallelism": 30		// 并行度
+            }
+         */
+//        String projectJson = projectRecord.value();
+        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
+        String projectId = projectMessageDTO.getProjectId();    // 项目 id
+        projectMapper.updateProjectState(projectId, "20");   // 修改该 project 的状态为执行中
+
+
+        // -------------------------------- 1 场景 --------------------------------
+        // 根据 packageId,拿到场景 id,然后获取每个场景的文件地址。
+        String packageId = projectMessageDTO.getScenePackageId();
+        List<IndexTemplatePO> leafIndexList = indexTemplateMapper.selectLeafIndexByPackageId(packageId);
+        List<String> naturalIdList = new ArrayList<>();
+        List<String> standardIdList = new ArrayList<>();
+        List<String> accidentIdList = new ArrayList<>();
+        for (IndexTemplatePO indexTemplatePO : leafIndexList) {
+            String naturalIds = indexTemplatePO.getSceneNaturalIds();
+            String standardIds = indexTemplatePO.getSceneStatueIds();
+            String accidentIds = indexTemplatePO.getSceneTrafficIds();
+            if (StringUtil.isNotEmpty(naturalIds)) {
+                String[] naturalIdArray = naturalIds.split(",");
+                naturalIdList.addAll(Arrays.asList(naturalIdArray));
+            }
+            if (StringUtil.isNotEmpty(standardIds)) {
+                String[] standardArray = standardIds.split(",");
+                standardIdList.addAll(Arrays.asList(standardArray));
+            }
+            if (StringUtil.isNotEmpty(accidentIds)) {
+                String[] accidentIdArray = accidentIds.split(",");
+                accidentIdList.addAll(Arrays.asList(accidentIdArray));
+            }
+        }
+        List<ScenePO> sceneList = new ArrayList<>();
+        sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
+        sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
+        sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
+        projectMapper.updateTaskNumber(projectId, sceneList.size()); // 有多少场景就有多少任务
+        // -------------------------------- 2 模型 --------------------------------
+        // 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
+        String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
+        VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 模型
+        List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
+        List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
+
+        // -------------------------------- 3 任务消息 --------------------------------
+        // 根据场景创建任务,组装 task 消息
+        int maxSimulationTime = projectMessageDTO.getMaxSimulationTime();
+
+        for (ScenePO scenePO : sceneList) {
+            String taskId = StringUtil.getRandomUUID();
+            String resultPath = linuxTempPath + projectId + "/" + taskId;
+            // 保存任务信息
+            TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
+                    .id(taskId)
+                    .pId(projectId)
+                    .sceneId(scenePO.getId())
+                    .sceneName(scenePO.getName())
+                    .sceneType(scenePO.getType())
+                    .runState(DictConstants.TASK_PENDING)
+                    .runResult(resultPath)
+                    .build();
+            taskPO.setCreateTime(TimeUtil.getNowForMysql());
+            taskPO.setCreateUserId(USER_ID);
+            taskPO.setModifyTime(TimeUtil.getNowForMysql());
+            taskPO.setModifyUserId(USER_ID);
+            taskPO.setModifyTime(TimeUtil.getNowForMysql());
+            taskPO.setIsDeleted("0");
+            taskMapper.insert(taskPO);
+            // 心跳信息存在緩存中
+            redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId, TimeUtil.getNowString());
+            // 组装 task 消息
+            TaskTO taskTO = TaskTO.builder()
+                    .info(InfoTO.builder()
+                            .project_id(projectId)
+                            .task_id(taskId)
+                            .task_path(resultPath)
+                            .default_time(maxSimulationTime)
+                            .build())
+                    .scenario(ScenarioTO.builder()
+                            .scenario_osc(scenePO.getScenarioOsc())
+                            .scenario_odr(scenePO.getScenarioOdr())
+                            .scenario_osgb(scenePO.getScenarioOsgb())
+                            .build())
+                    .vehicle(VehicleTO.builder()
+                            .model(ModelTO.builder()
+                                    .model_label(vehiclePO.getModelLabel())
+                                    .build())
+                            .dynamics(DynamicsTO.builder()
+                                    .dynamics_maxspeed(vehiclePO.getMaxSpeed())
+                                    .dynamics_enginepower(vehiclePO.getEnginePower())
+                                    .dynamics_maxdecel(vehiclePO.getMaxDeceleration())
+                                    .dynamics_maxsteering(vehiclePO.getMaxSteeringAngle())
+                                    .dynamics_mass(vehiclePO.getMass())
+                                    .dynamics_frontsurfaceeffective(vehiclePO.getFrontSurfaceEffective())
+                                    .dynamics_airdragcoefficient(vehiclePO.getAirDragCoefficient())
+                                    .dynamics_rollingresistance(vehiclePO.getRollingResistanceCoefficient())
+                                    .dynamics_wheeldiameter(vehiclePO.getWheelDiameter())
+                                    .dynamics_wheeldrive(vehiclePO.getWheelDrive())
+                                    .dynamics_overallefficiency(vehiclePO.getOverallEfficiency())
+                                    .dynamics_distfront(vehiclePO.getFrontDistance())
+                                    .dynamics_distrear(vehiclePO.getRearDistance())
+                                    .dynamics_distleft(vehiclePO.getLeftDistance())
+                                    .dynamics_distright(vehiclePO.getRightDistance())
+                                    .dynamics_distheight(vehiclePO.getHeightDistance())
+                                    .dynamics_wheelbase(vehiclePO.getWheelbase())
+                                    .build())
+                            .sensors(SensorsTO.builder()   // 根据 vehicleId 查询绑定的传感器列表
+                                    .camera(cameraPOList)
+                                    .OGT(ogtPOList)
+                                    .build())
+                            .build())
+                    .build();
+            //4-4 将对象转成 json
+            String taskJson = JsonUtil.beanToJson(taskTO);
+            //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
+//            kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
+            kafkaTemplate.send("test", taskJson).addCallback(success -> {
+                // 消息发送到的topic
+                String topic = success.getRecordMetadata().topic();
+                // 消息发送到的分区
+                int partition = success.getRecordMetadata().partition();
+                // 消息在分区内的offset
+                long offset = success.getRecordMetadata().offset();
+                log.info("------- 发送消息成功:\n"
+                        + "主题 topic 为:" + topic + "\n"
+                        + "分区 partition 为:" + partition + "\n"
+                        + "偏移量为:" + offset + "\n"
+                        + "消息体为:" + taskJson);
             }, failure -> {
                 log.error("------- 发送消息失败:" + failure.getMessage());
             });

+ 7 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/KafkaController.java

@@ -22,6 +22,13 @@ public class KafkaController {
     @Autowired
     ManualProjectConsumer manualProjectConsumer;
 
+
+    @PostMapping("/project")
+    @SneakyThrows
+    public void project(@RequestBody ProjectMessageDTO projectMessageDTO) {
+        manualProjectConsumer.parseProjectService(JsonUtil.beanToJson(projectMessageDTO));
+    }
+
     @PostMapping("/hello")
     public void hello() {
         kafkaTemplate.send("hello", "hello world!").addCallback(success -> {

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/SceneMapper.java

@@ -92,7 +92,7 @@ public interface SceneMapper {
             "from scene_accident\n" +
             "where is_deleted = '0'\n" +
             "<if test='idList != null and idList.size() > 0'>\n" +
-            "   and regulations_id in\n" +
+            "   and accident_id in\n" +
             "       <foreach collection='idList' index='index' item='item' open='(' close=')' separator=','>\n" +
             "           #{item}" +
             "       </foreach>\n" +