martin %!s(int64=3) %!d(string=hai) anos
pai
achega
3d855e8a28

+ 192 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -68,9 +68,197 @@ public class ManualProjectConsumer {
         System.out.println("------- 消费成功:" + projectRecord.value());
     }
 
-//    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.topic}")
-//    public void parseProject(ConsumerRecord<String, String> projectRecord) throws IOException, ApiException {
-    public void parseProject(String projectJson) throws IOException, ApiException {
+    //    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.topic}")
+    public void parseProject(ConsumerRecord<String, String> projectRecord) throws IOException, ApiException {
+//    public void parseProject(String projectJson) throws IOException, ApiException {
+        System.out.println("------- 接收到消息为:" + projectRecord);
+//        System.out.println("------- 接收到消息为:" + projectJson);
+        //1 读取 kafka 的 project 信息
+        /*
+            {
+                "projectId": "sadfasdfs",	// 项目 id
+                "algorithmId": "sadfasdfs",	// 算法 id
+                "vehicleConfigId": "sadfasdfs",	// 车辆 id
+                "scenePackageId": "sadfasdfs",	// 场景包 id
+                "maxSimulationTime": 11111,	// 最大仿真时间
+                "parallelism": 30		// 并行度
+            }
+         */
+        String projectJson = projectRecord.value();
+        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
+        String projectId = projectMessageDTO.getProjectId();    // 项目 id
+        projectMapper.updateProjectState(projectId, "20");   // 修改该 project 的状态为执行中
+
+
+        // -------------------------------- 1 场景 --------------------------------
+        // 根据 packageId,拿到场景 id,然后获取每个场景的文件地址。
+        String packageId = projectMessageDTO.getScenePackageId();
+        List<IndexTemplatePO> leafIndexList = indexTemplateMapper.selectLeafIndexByPackageId(packageId);
+        List<String> naturalIdList = new ArrayList<>();
+        List<String> standardIdList = new ArrayList<>();
+        List<String> accidentIdList = new ArrayList<>();
+        for (IndexTemplatePO indexTemplatePO : leafIndexList) {
+            String naturalIds = indexTemplatePO.getSceneNaturalIds();
+            String standardIds = indexTemplatePO.getSceneStatueIds();
+            String accidentIds = indexTemplatePO.getSceneTrafficIds();
+            if (StringUtil.isNotEmpty(naturalIds)) {
+                String[] naturalIdArray = naturalIds.split(",");
+                naturalIdList.addAll(Arrays.asList(naturalIdArray));
+            }
+            if (StringUtil.isNotEmpty(standardIds)) {
+                String[] standardArray = standardIds.split(",");
+                standardIdList.addAll(Arrays.asList(standardArray));
+            }
+            if (StringUtil.isNotEmpty(accidentIds)) {
+                String[] accidentIdArray = accidentIds.split(",");
+                accidentIdList.addAll(Arrays.asList(accidentIdArray));
+            }
+        }
+        List<ScenePO> sceneList = new ArrayList<>();
+        sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
+        sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
+        sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
+        projectMapper.updateTaskNumber(projectId, sceneList.size()); // 有多少场景就有多少任务
+        // -------------------------------- 2 模型 --------------------------------
+        // 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
+        String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
+        VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 模型
+        List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
+        List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
+
+        // -------------------------------- 3 任务消息 --------------------------------
+        // 根据场景创建任务,组装 task 消息
+        int maxSimulationTime = projectMessageDTO.getMaxSimulationTime();
+
+        for (ScenePO scenePO : sceneList) {
+            String taskId = StringUtil.getRandomUUID();
+            String resultPath = linuxTempPath + projectId + "/" + taskId;
+            // 保存任务信息
+            TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
+                    .id(taskId)
+                    .pId(projectId)
+                    .sceneId(scenePO.getId())
+                    .sceneName(scenePO.getName())
+                    .sceneType(scenePO.getType())
+                    .runState(DictConstants.TASK_PENDING)
+                    .runResult(resultPath)
+                    .build();
+            taskPO.setCreateTime(TimeUtil.getNowForMysql());
+            taskPO.setCreateUserId(USER_ID);
+            taskPO.setModifyTime(TimeUtil.getNowForMysql());
+            taskPO.setModifyUserId(USER_ID);
+            taskPO.setModifyTime(TimeUtil.getNowForMysql());
+            taskPO.setIsDeleted("0");
+            taskMapper.insert(taskPO);
+            // 心跳信息存在緩存中
+            commonService.set(new RedisParameter(manualProjectTopic + ":" + projectId + ":" + taskId, TimeUtil.getNow() + "", 0));
+            // 组装 task 消息
+            TaskTO taskTO = TaskTO.builder()
+                    .info(InfoTO.builder()
+                            .project_id(projectId)
+                            .task_id(taskId)
+                            .task_path(resultPath)
+                            .default_time(maxSimulationTime)
+                            .build())
+                    .scenario(ScenarioTO.builder()
+                            .scenario_osc(scenePO.getScenarioOsc())
+                            .scenario_odr(scenePO.getScenarioOdr())
+                            .scenario_osgb(scenePO.getScenarioOsgb())
+                            .build())
+                    .vehicle(VehicleTO.builder()
+                            .model(ModelTO.builder()
+                                    .model_label(vehiclePO.getModelLabel())
+                                    .build())
+                            .dynamics(DynamicsTO.builder()
+                                    .dynamics_maxspeed(vehiclePO.getMaxSpeed())
+                                    .dynamics_enginepower(vehiclePO.getEnginePower())
+                                    .dynamics_maxdecel(vehiclePO.getMaxDeceleration())
+                                    .dynamics_maxsteering(vehiclePO.getMaxSteeringAngle())
+                                    .dynamics_mass(vehiclePO.getMass())
+                                    .dynamics_frontsurfaceeffective(vehiclePO.getFrontSurfaceEffective())
+                                    .dynamics_airdragcoefficient(vehiclePO.getAirDragCoefficient())
+                                    .dynamics_rollingresistance(vehiclePO.getRollingResistanceCoefficient())
+                                    .dynamics_wheeldiameter(vehiclePO.getWheelDiameter())
+                                    .dynamics_wheeldrive(vehiclePO.getWheelDrive())
+                                    .dynamics_overallefficiency(vehiclePO.getOverallEfficiency())
+                                    .dynamics_distfront(vehiclePO.getFrontDistance())
+                                    .dynamics_distrear(vehiclePO.getRearDistance())
+                                    .dynamics_distleft(vehiclePO.getLeftDistance())
+                                    .dynamics_distright(vehiclePO.getRightDistance())
+                                    .dynamics_distheight(vehiclePO.getHeightDistance())
+                                    .dynamics_wheelbase(vehiclePO.getWheelbase())
+                                    .build())
+                            .sensors(SensorsTO.builder()   // 根据 vehicleId 查询绑定的传感器列表
+                                    .camera(cameraPOList)
+                                    .OGT(ogtPOList)
+                                    .build())
+                            .build())
+                    .build();
+            //4-4 将对象转成 json
+            String taskJson = JsonUtil.beanToJson(taskTO);
+            //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
+            commonService.send(new KafkaParameter(projectId, taskJson));
+        }
+
+        // -------------------------------- 4 算法(一期按单机版做) --------------------------------
+        // 私有仓库导入算法镜像(搭建私有仓库)
+        String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
+        //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
+        AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
+        String minioPath = algorithmPO.getMinioPath();
+        String dockerImage;
+        if ("0".equals(algorithmPO.getDockerImport())) {
+            dockerImage = "algorithm_" + algorithmId + ":latest";
+            String algorithmTarLinuxTempPath = linuxTempPath + minioPath;
+            // 下载算法文件到本地( 2 到仓库服务器)
+            Response response = commonService.download(new MinioParameter(minioPath));
+            InputStream inputStream = response.body().asInputStream();
+            FileUtil.writeInputStreamToLocalFile(inputStream, algorithmTarLinuxTempPath);
+            //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
+            LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
+        } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
+            dockerImage = algorithmPO.getDockerImage();
+        } else {
+            throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
+        }
+        // -------------------------------- 5 创建 pod 开始执行 --------------------------------
+        int completions = sceneList.size();     // 结束标
+        int parallelism = projectMessageDTO.getParallelism();    // 并行度
+        BatchV1Api batchV1Api = new BatchV1Api(apiClient);
+        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
+        //1 apiVersion
+        //2 kind
+        //3 metadata
+        V1ObjectMeta metadata = yaml.getMetadata();
+        metadata.setName("project_" + projectId);
+        yaml.setMetadata(metadata);
+        //4 job
+        V1JobSpec job = yaml.getSpec();
+        job.setCompletions(completions); // 这个标准是什么?
+        job.setParallelism(parallelism);
+        //5 pod
+        V1PodSpec v1PodSpec = job.getTemplate().getSpec();
+        //6 container
+        List<V1Container> containers = v1PodSpec.getContainers();
+        for (V1Container container : containers) {
+            String name = container.getName();
+            if ("vtd".equals(name)) {
+                container.setName("vtd_" + projectId);
+            }
+            if ("algorithm".equals(name)) {
+                container.setName("algorithm_" + projectId);
+                container.setImage(dockerImage);
+            }
+        }
+        //4-4 创建
+        yaml.setSpec(job);
+        batchV1Api.createNamespacedJob("simulation-task", yaml, null, null, null);
+
+    }
+
+
+//    public void testParseProject(ConsumerRecord<String, String> projectRecord) throws IOException, ApiException {
+    public void testParseProject(String projectJson) throws IOException, ApiException {
 //        System.out.println("------- 接收到消息为:" + projectRecord);
         System.out.println("------- 接收到消息为:" + projectJson);
         //1 读取 kafka 的 project 信息
@@ -257,4 +445,5 @@ public class ManualProjectConsumer {
     }
 
 
+
 }

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/TestConsumerController.java

@@ -22,6 +22,6 @@ public class TestConsumerController {
     @PostMapping("/consumer")
     public void hello(@RequestBody ProjectMessageDTO projectMessageDTO) throws IOException, ApiException {
 
-        manualProjectConsumer.parseProject(JsonUtil.beanToJson(projectMessageDTO));
+        manualProjectConsumer.testParseProject(JsonUtil.beanToJson(projectMessageDTO));
     }
 }