martin 3 سال پیش
والد
کامیت
6305b57508

+ 10 - 203
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -10,9 +10,6 @@ import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.pojo.to.*;
 import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.openapi.apis.BatchV1Api;
-import io.kubernetes.client.openapi.models.V1Job;
-import io.kubernetes.client.util.Yaml;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -23,7 +20,6 @@ import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
-import org.springframework.util.ResourceUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -68,15 +64,13 @@ public class ManualProjectConsumer {
 
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "hello")
     public void testConsumer(ConsumerRecord<String, String> projectRecord) {
-        System.out.println("------- 消费成功:" + projectRecord.value());
+        log.info("------- testConsumer 消费成功:" + projectRecord.value());
     }
 
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.topic}")
     @SneakyThrows
     public void parseProject(ConsumerRecord<String, String> projectRecord) {
-//    public void parseProject(String projectJson) throws IOException, ApiException {
-        System.out.println("------- 接收到消息为:" + projectRecord);
-//        System.out.println("------- 接收到消息为:" + projectJson);
+        log.info("------- parseProject 接收到消息为:" + projectRecord);
         //1 读取 kafka 的 project 信息
         /*
             {
@@ -201,7 +195,8 @@ public class ManualProjectConsumer {
             //4-4 将对象转成 json
             String taskJson = JsonUtil.beanToJson(taskTO);
             //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
-            kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
+//            kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
+            kafkaTemplate.send("test", taskJson).addCallback(success -> {
                 // 消息发送到的topic
                 String topic = success.getRecordMetadata().topic();
                 // 消息发送到的分区
@@ -237,198 +232,11 @@ public class ManualProjectConsumer {
 //            throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
 //        }
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
-        int completions = sceneList.size();     // 结束标
-        int parallelism = projectMessageDTO.getParallelism();    // 并行度
-        BatchV1Api batchV1Api = new BatchV1Api(apiClient);
-//        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
-        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-test.yaml"));
-//        //1 apiVersion
-//        //2 kind
-//        //3 metadata
-//        V1ObjectMeta metadata = yaml.getMetadata();
-//        metadata.setName("project_" + projectId);
-//        yaml.setMetadata(metadata);
-//        //4 job
-//        V1JobSpec job = yaml.getSpec();
-//        job.setCompletions(completions); // 这个标准是什么?
-//        job.setParallelism(parallelism);
-//        //5 pod
-//        V1PodSpec v1PodSpec = job.getTemplate().getSpec();
-//        //6 container
-//        List<V1Container> containers = v1PodSpec.getContainers();
-//        for (V1Container container : containers) {
-//            String name = container.getName();
-//            if ("vtd".equals(name)) {
-//                container.setName("vtd_" + projectId);
-//            }
-//            if ("algorithm".equals(name)) {
-//                container.setName("algorithm_" + projectId);
-////                container.setImage(dockerImage);
-//            }
-//        }
-//        //4-4 创建
-//        yaml.setSpec(job);
-        batchV1Api.createNamespacedJob("simulation-cloud", yaml, null, null, null);
-    }
-
-
-//    //    public void testParseProject(ConsumerRecord<String, String> projectRecord) throws IOException, ApiException {
-//    public void testParseProject(String projectJson) throws IOException, ApiException {
-////        System.out.println("------- 接收到消息为:" + projectRecord);
-//        System.out.println("------- 接收到消息为:" + projectJson);
-//        //1 读取 kafka 的 project 信息
-//        /*
-//            {
-//                "projectId": "sadfasdfs",	// 项目 id
-//                "algorithmId": "sadfasdfs",	// 算法 id
-//                "vehicleConfigId": "sadfasdfs",	// 车辆 id
-//                "scenePackageId": "sadfasdfs",	// 场景包 id
-//                "maxSimulationTime": 11111,	// 最大仿真时间
-//                "parallelism": 30		// 并行度
-//            }
-//         */
-////        String projectJson = projectRecord.value();
-//        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
-//        String projectId = projectMessageDTO.getProjectId();    // 项目 id
-//        projectMapper.updateProjectState(projectId, "20");   // 修改该 project 的状态为执行中
-//
-//
-//        // -------------------------------- 1 场景 --------------------------------
-//        // 根据 packageId,拿到场景 id,然后获取每个场景的文件地址。
-//        String packageId = projectMessageDTO.getScenePackageId();
-//        List<IndexTemplatePO> leafIndexList = indexTemplateMapper.selectLeafIndexByPackageId(packageId);
-//        List<String> naturalIdList = new ArrayList<>();
-//        List<String> standardIdList = new ArrayList<>();
-//        List<String> accidentIdList = new ArrayList<>();
-//        for (IndexTemplatePO indexTemplatePO : leafIndexList) {
-//            String naturalIds = indexTemplatePO.getSceneNaturalIds();
-//            String standardIds = indexTemplatePO.getSceneStatueIds();
-//            String accidentIds = indexTemplatePO.getSceneTrafficIds();
-//            if (StringUtil.isNotEmpty(naturalIds)) {
-//                String[] naturalIdArray = naturalIds.split(",");
-//                naturalIdList.addAll(Arrays.asList(naturalIdArray));
-//            }
-//            if (StringUtil.isNotEmpty(standardIds)) {
-//                String[] standardArray = standardIds.split(",");
-//                standardIdList.addAll(Arrays.asList(standardArray));
-//            }
-//            if (StringUtil.isNotEmpty(accidentIds)) {
-//                String[] accidentIdArray = accidentIds.split(",");
-//                accidentIdList.addAll(Arrays.asList(accidentIdArray));
-//            }
-//        }
-//        List<ScenePO> sceneList = new ArrayList<>();
-//        sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
-//        sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
-//        sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
-//        projectMapper.updateTaskNumber(projectId, sceneList.size()); // 有多少场景就有多少任务
-//        // -------------------------------- 2 模型 --------------------------------
-//        // 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
-//        String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
-//        VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 模型
-//        List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
-//        List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
-//
-//        // -------------------------------- 3 任务消息 --------------------------------
-//        // 根据场景创建任务,组装 task 消息
-//        int maxSimulationTime = projectMessageDTO.getMaxSimulationTime();
-//
-//        for (ScenePO scenePO : sceneList) {
-//            String taskId = StringUtil.getRandomUUID();
-//            String resultPath = linuxTempPath + projectId + "/" + taskId;
-//            // 保存任务信息
-//            TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
-//                    .id(taskId)
-//                    .pId(projectId)
-//                    .sceneId(scenePO.getId())
-//                    .sceneName(scenePO.getName())
-//                    .sceneType(scenePO.getType())
-//                    .runState(DictConstants.TASK_PENDING)
-//                    .runResult(resultPath)
-//                    .build();
-//            taskPO.setCreateTime(TimeUtil.getNowForMysql());
-//            taskPO.setCreateUserId(USER_ID);
-//            taskPO.setModifyTime(TimeUtil.getNowForMysql());
-//            taskPO.setModifyUserId(USER_ID);
-//            taskPO.setModifyTime(TimeUtil.getNowForMysql());
-//            taskPO.setIsDeleted("0");
-//            taskMapper.insert(taskPO);
-//            // 心跳信息存在緩存中
-//            commonService.set(new RedisParameter(manualProjectTopic + ":" + projectId + ":" + taskId, TimeUtil.getNow() + "", 0));
-//            // 组装 task 消息
-//            TaskTO taskTO = TaskTO.builder()
-//                    .info(InfoTO.builder()
-//                            .project_id(projectId)
-//                            .task_id(taskId)
-//                            .task_path(resultPath)
-//                            .default_time(maxSimulationTime)
-//                            .build())
-//                    .scenario(ScenarioTO.builder()
-//                            .scenario_osc(scenePO.getScenarioOsc())
-//                            .scenario_odr(scenePO.getScenarioOdr())
-//                            .scenario_osgb(scenePO.getScenarioOsgb())
-//                            .build())
-//                    .vehicle(VehicleTO.builder()
-//                            .model(ModelTO.builder()
-//                                    .model_label(vehiclePO.getModelLabel())
-//                                    .build())
-//                            .dynamics(DynamicsTO.builder()
-//                                    .dynamics_maxspeed(vehiclePO.getMaxSpeed())
-//                                    .dynamics_enginepower(vehiclePO.getEnginePower())
-//                                    .dynamics_maxdecel(vehiclePO.getMaxDeceleration())
-//                                    .dynamics_maxsteering(vehiclePO.getMaxSteeringAngle())
-//                                    .dynamics_mass(vehiclePO.getMass())
-//                                    .dynamics_frontsurfaceeffective(vehiclePO.getFrontSurfaceEffective())
-//                                    .dynamics_airdragcoefficient(vehiclePO.getAirDragCoefficient())
-//                                    .dynamics_rollingresistance(vehiclePO.getRollingResistanceCoefficient())
-//                                    .dynamics_wheeldiameter(vehiclePO.getWheelDiameter())
-//                                    .dynamics_wheeldrive(vehiclePO.getWheelDrive())
-//                                    .dynamics_overallefficiency(vehiclePO.getOverallEfficiency())
-//                                    .dynamics_distfront(vehiclePO.getFrontDistance())
-//                                    .dynamics_distrear(vehiclePO.getRearDistance())
-//                                    .dynamics_distleft(vehiclePO.getLeftDistance())
-//                                    .dynamics_distright(vehiclePO.getRightDistance())
-//                                    .dynamics_distheight(vehiclePO.getHeightDistance())
-//                                    .dynamics_wheelbase(vehiclePO.getWheelbase())
-//                                    .build())
-//                            .sensors(SensorsTO.builder()   // 根据 vehicleId 查询绑定的传感器列表
-//                                    .camera(cameraPOList)
-//                                    .OGT(ogtPOList)
-//                                    .build())
-//                            .build())
-//                    .build();
-//            //4-4 将对象转成 json
-//            String taskJson = JsonUtil.beanToJson(taskTO);
-//            //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
-//            commonService.send(new KafkaParameter(projectId, taskJson));
-//        }
-//
-//        // -------------------------------- 4 算法(一期按单机版做) --------------------------------
-//        // 私有仓库导入算法镜像(搭建私有仓库)
-//        String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
-//        //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
-//        AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
-//        String minioPath = algorithmPO.getMinioPath();
-//        String dockerImage;
-//        if ("0".equals(algorithmPO.getDockerImport())) {
-//            dockerImage = "algorithm_" + algorithmId + ":latest";
-//            String algorithmTarLinuxTempPath = linuxTempPath + minioPath;
-//            // 下载算法文件到本地( 2 到仓库服务器)
-//            Response response = commonService.download(new MinioParameter(minioPath));
-//            InputStream inputStream = response.body().asInputStream();
-//            FileUtil.writeInputStreamToLocalFile(inputStream, algorithmTarLinuxTempPath);
-//            //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
-//            LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
-//        } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
-//            dockerImage = algorithmPO.getDockerImage();
-//        } else {
-//            throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
-//        }
-//        // -------------------------------- 5 创建 pod 开始执行 --------------------------------
 //        int completions = sceneList.size();     // 结束标
 //        int parallelism = projectMessageDTO.getParallelism();    // 并行度
 //        BatchV1Api batchV1Api = new BatchV1Api(apiClient);
-//        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
+////        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
+//        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-test.yaml"));
 //        //1 apiVersion
 //        //2 kind
 //        //3 metadata
@@ -437,7 +245,7 @@ public class ManualProjectConsumer {
 //        yaml.setMetadata(metadata);
 //        //4 job
 //        V1JobSpec job = yaml.getSpec();
-//        job.setCompletions(completions);
+//        job.setCompletions(completions); // 这个标准是什么?
 //        job.setParallelism(parallelism);
 //        //5 pod
 //        V1PodSpec v1PodSpec = job.getTemplate().getSpec();
@@ -450,14 +258,13 @@ public class ManualProjectConsumer {
 //            }
 //            if ("algorithm".equals(name)) {
 //                container.setName("algorithm_" + projectId);
-//                container.setImage(dockerImage);
+////                container.setImage(dockerImage);
 //            }
 //        }
 //        //4-4 创建
 //        yaml.setSpec(job);
-//        batchV1Api.createNamespacedJob("simulation-task", yaml, null, null, null);
-//
-//    }
+//        batchV1Api.createNamespacedJob("simulation-cloud", yaml, null, null, null);
+    }
 
 
 }