root 2 سال پیش
والد
کامیت
fc002b5cc3

+ 22 - 18
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -449,6 +449,8 @@ public class ProjectConsumer {
         }
         }
     }
     }
 
 
+    //* -------------------------------- 运行 --------------------------------
+
     /**
     /**
      * @param projectMessageDTO 初始接收到的项目启动信息
      * @param projectMessageDTO 初始接收到的项目启动信息
      * @param clusterId         集群 id
      * @param clusterId         集群 id
@@ -473,16 +475,6 @@ public class ProjectConsumer {
         }
         }
     }
     }
 
 
-    /**
-     * @param projectWaitingKey 项目等待 key
-     * @param projectMessageDTO 项目信息
-     */
-    @SneakyThrows
-    public void wait(String projectWaitingKey, ProjectMessageDTO projectMessageDTO) {
-        stringRedisTemplate.opsForValue().set(projectWaitingKey, JsonUtil.beanToJson(projectMessageDTO));
-    }
-
-
     /**
     /**
      * @param projectMessageDTO 初始接收到的项目启动信息
      * @param projectMessageDTO 初始接收到的项目启动信息
      * @param projectRunningKey projectRunningKey
      * @param projectRunningKey projectRunningKey
@@ -557,8 +549,7 @@ public class ProjectConsumer {
             String topic = recordMetadata.topic();  // 消息发送到的topic
             String topic = recordMetadata.topic();  // 消息发送到的topic
             int partition = recordMetadata.partition(); // 消息发送到的分区
             int partition = recordMetadata.partition(); // 消息发送到的分区
             long offset = recordMetadata.offset();  // 消息在分区内的offset
             long offset = recordMetadata.offset();  // 消息在分区内的offset
-            log.info("parseProject() 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:"
-                    + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
+            log.info("parseProject() 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
             //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
             //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
             // 选一个 count 最少的 node
             // 选一个 count 最少的 node
             String currentNodeName = "";
             String currentNodeName = "";
@@ -576,27 +567,40 @@ public class ProjectConsumer {
             }
             }
             currentNodeTO.setCount(currentNodeTO.getCount() + 1);
             currentNodeTO.setCount(currentNodeTO.getCount() + 1);
 
 
-            log.info("项目 " + projectId + " 准备创建 yaml:是否使用 gpu " + isChoiceGpu + ",当前节点名称为:" + currentNodeName + ",当前节点已创建 yaml 个数为:" + currentCount);
+            log.info("parseProject() 项目 " + projectId + " 准备创建 yaml:是否使用 gpu " + isChoiceGpu + ",当前节点名称为:" + currentNodeName + ",当前节点已创建 yaml 个数为:" + currentCount);
             String tempYaml = projectManager.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu);
             String tempYaml = projectManager.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu);
             if (currentCount == 0) {
             if (currentCount == 0) {
-                log.info("加入到启动列表 " + tempYaml);
+                log.info("parseProject() 加入到启动列表 " + tempYaml);
                 yamlListToRun.add(tempYaml);
                 yamlListToRun.add(tempYaml);
             }
             }
             messageNumber++;
             messageNumber++;
         }
         }
         TimeUnit.SECONDS.sleep(6);
         TimeUnit.SECONDS.sleep(6);
-        log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息。");
-        log.info("项目 " + projectId + " 准备首先启动 " + yamlListToRun);
+        log.info("parseProject() 项目 " + projectId + " 共发送了 " + messageNumber + " 条消息。");
+        log.info("parseProject() 项目 " + projectId + " 准备首先启动 " + yamlListToRun);
         for (String yaml : yamlListToRun) {
         for (String yaml : yamlListToRun) {
             projectUtil.createPod2(yaml);
             projectUtil.createPod2(yaml);
         }
         }
-        log.info("项目 " + projectId + " 已经启动 " + yamlListToRun);
+        log.info("parseProject() 项目 " + projectId + " 已经启动 " + yamlListToRun);
+    }
+
+
+    //* -------------------------------- 等待 --------------------------------
+
+    /**
+     * @param projectWaitingKey 项目等待 key
+     * @param projectMessageDTO 项目信息
+     */
+    @SneakyThrows
+    public void wait(String projectWaitingKey, ProjectMessageDTO projectMessageDTO) {
+        stringRedisTemplate.opsForValue().set(projectWaitingKey, JsonUtil.beanToJson(projectMessageDTO));
     }
     }
+    //* -------------------------------- 结束 --------------------------------
 
 
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")
     @SneakyThrows
     @SneakyThrows
     public void stopProject(ConsumerRecord<String, String> stopRecord) {
     public void stopProject(ConsumerRecord<String, String> stopRecord) {
-        log.info("接收到的项目终止消息为:" + stopRecord);
+        log.info("stopProject() 接收到的项目终止消息为:" + stopRecord);
         //1 读取 kafka 的项目停止信息
         //1 读取 kafka 的项目停止信息
         /*
         /*
             {
             {

+ 617 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumerBackup.java

@@ -0,0 +1,617 @@
+//package com.css.simulation.resource.scheduler.consumer;
+//
+//
+//import api.common.pojo.constants.DictConstants;
+//import api.common.pojo.dto.ProjectMessageDTO;
+//import api.common.util.*;
+//import com.css.simulation.resource.scheduler.manager.ProjectManager;
+//import com.css.simulation.resource.scheduler.manager.TaskManager;
+//import com.css.simulation.resource.scheduler.mapper.*;
+//import com.css.simulation.resource.scheduler.pojo.po.*;
+//import com.css.simulation.resource.scheduler.pojo.to.*;
+//import com.css.simulation.resource.scheduler.service.ProjectService;
+//import com.css.simulation.resource.scheduler.util.ApacheKafkaUtil;
+//import com.css.simulation.resource.scheduler.util.MinioUtil;
+//import com.css.simulation.resource.scheduler.util.ProjectUtil;
+//import com.fasterxml.jackson.databind.JsonNode;
+//import com.fasterxml.jackson.databind.ObjectMapper;
+//import io.minio.MinioClient;
+//import lombok.SneakyThrows;
+//import lombok.extern.slf4j.Slf4j;
+//import org.apache.kafka.clients.admin.Admin;
+//import org.apache.kafka.clients.consumer.ConsumerRecord;
+//import org.apache.kafka.clients.producer.RecordMetadata;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.data.redis.core.StringRedisTemplate;
+//import org.springframework.kafka.annotation.KafkaListener;
+//import org.springframework.kafka.core.KafkaTemplate;
+//import org.springframework.kafka.support.SendResult;
+//import org.springframework.stereotype.Component;
+//
+//import javax.annotation.Resource;
+//import java.util.*;
+//import java.util.concurrent.CopyOnWriteArrayList;
+//import java.util.concurrent.TimeUnit;
+//
+//@Component
+//@Slf4j
+//public class ProjectConsumerBackup {
+//    @Value("${scheduler.linux-path.temp}")
+//    private String linuxTempPath;
+//    @Value("${scheduler.minio-path.project-result}")
+//    private String projectResultPathOfMinio;
+//    @Value("${minio.bucket-name}")
+//    private String bucketName;
+//
+//    // -------------------------------- Comment --------------------------------
+//    @Resource
+//    private MinioClient minioClient;
+//    @Resource
+//    private StringRedisTemplate stringRedisTemplate;
+//    @Resource
+//    private ManualProjectMapper manualProjectMapper;
+//    @Resource
+//    private AutoSubProjectMapper autoSubProjectMapper;
+//    @Resource
+//    private VehicleMapper vehicleMapper;
+//    @Resource
+//    private SensorCameraMapper sensorCameraMapper;
+//    @Resource
+//    private SensorOgtMapper sensorOgtMapper;
+//    @Resource
+//    private AlgorithmMapper algorithmMapper;
+//    @Resource
+//    private UserMapper userMapper;
+//    @Resource
+//    private ClusterMapper clusterMapper;
+//    @Resource
+//    private ProjectService projectService;
+//    @Resource
+//    private ProjectUtil projectUtil;
+//    @Resource
+//    private IndexMapper indexMapper;
+//    @Resource
+//    private TaskMapper taskMapper;
+//    @Resource
+//    private TaskManager taskManager;
+//    @Resource
+//    private ProjectManager projectManager;
+//    @Resource
+//    private KafkaTemplate<String, String> kafkaTemplate;
+//    @Resource(name = "myKafkaAdmin")
+//    private Admin kafkaAdminClient;
+//
+//    /**
+//     * 接收到运行信息立即复制一份数据作为运行数据
+//     *
+//     * @param projectRecord 项目启动消息
+//     */
+//    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.start-topic}")
+//    @SneakyThrows
+//    public void acceptMessage(ConsumerRecord<String, String> projectRecord) {
+//        String initialProjectJson = projectRecord.value();
+//        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(initialProjectJson, ProjectMessageDTO.class);
+//        String projectId = projectMessageDTO.getProjectId();        // 手动执行项目 id 或 自动执行子项目 id
+//        new Thread(() -> createTaskAndFixData(projectRecord), "fix-" + projectId).start();
+//    }
+//
+//
+//    @SneakyThrows
+//    public void createTaskAndFixData(ConsumerRecord<String, String> projectRecord) {
+//        //* -------------------------------- 0 读取消息,创建临时目录 --------------------------------
+//        String initialProjectJson = projectRecord.value();
+//        log.info("createTaskAndFixData() 接收到项目开始消息为:" + initialProjectJson);
+//        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(initialProjectJson, ProjectMessageDTO.class);
+//        String projectId = projectMessageDTO.getProjectId();                // 手动执行项目 id 或 自动执行子项目 id
+//        String modelType = projectMessageDTO.getModelType();                // 模型类型,1 动力学模型 2 carsim模型
+//        String packageId = projectMessageDTO.getScenePackageId();           // 场景测试包 id
+//        String vehicleConfigId = projectMessageDTO.getVehicleConfigId();    // 模型配置 id
+//        String algorithmId = projectMessageDTO.getAlgorithmId();            // 模型配置 id
+//        long videoTime = projectMessageDTO.getMaxSimulationTime();          // 结果视频的时长
+//        String projectType = projectMessageDTO.getType();                   // 项目类型
+//        String userId = "";  // 用户 id
+//        if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
+//            userId = manualProjectMapper.selectCreateUserById(projectId);
+//        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
+//            userId = autoSubProjectMapper.selectCreateUserById(projectId);
+//        }
+//        String projectPath = linuxTempPath + "project/" + projectId + "/";
+//        FileUtil.mkdir(projectPath);
+//        //5 将该 project 下所有旧的指标得分删除。
+//        taskMapper.deleteByProject(projectId);
+//        indexMapper.deleteFirstTargetScoreByProjectId(projectId);
+//        indexMapper.deleteLastTargetScoreByProjectId(projectId);
+//        // -------------------------------- 1 查询场景 --------------------------------
+//        log.info("createTaskAndFixData() 项目 " + projectId + " 开始查询场景。");
+//        //根据场景测试包 packageId,拿到场景集合(包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
+//        List<ScenePO> scenePOList = projectService.getSceneList(projectId, packageId);
+//        int taskTotal = scenePOList.size();
+//        projectMessageDTO.setTaskTotal(taskTotal);
+//        projectMessageDTO.setTaskCompleted(0);
+//        //去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
+//        Set<ScenePO> scenePOSet = new HashSet<>(scenePOList);
+//        log.info("createTaskAndFixData() 项目 " + projectId + " 场景包括:" + scenePOSet);
+//        // -------------------------------- 2 算法导入 --------------------------------
+//        log.info("createTaskAndFixData() 项目 " + projectId + " 开始算法导入。");
+//        String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
+//        log.info("createTaskAndFixData() 项目 " + projectId + " 算法已导入:" + algorithmDockerImage);
+//        // -------------------------------- 3 查询模型 --------------------------------
+//        if ("1".equals(modelType)) {
+//            log.info("createTaskAndFixData() 项目 " + projectId + " 开始查询模型。");
+//            //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
+//            VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
+//            List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
+//            List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
+//            // -------------------------------- 4 保存任务消息 --------------------------------
+//            log.info("createTaskAndFixData() 项目 " + projectId + " 开始保存任务消息。");
+//            List<TaskPO> taskList = new ArrayList<>();
+//            for (ScenePO scenePO : scenePOSet) {
+//                String sceneId = scenePO.getId();
+//                //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
+//                List<String> lastTargetIdList = null;
+//                if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
+//                    lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
+//                } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
+//                    lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
+//                }
+//                if (CollectionUtil.isEmpty(lastTargetIdList)) {
+//                    throw new RuntimeException("createTaskAndFixData() 项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
+//                }
+//                for (String lastTargetId : lastTargetIdList) {
+//                    String taskId = StringUtil.getRandomUUID();
+//                    // 保存任务信息
+//                    TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
+//                            .id(taskId)
+//                            .pId(projectId)
+//                            .sceneId(sceneId)
+//                            .lastTargetId(lastTargetId)
+//                            .sceneName(scenePO.getName())
+//                            .sceneType(scenePO.getType())
+//                            .runState(DictConstants.TASK_PENDING)
+//                            .runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId)
+//                            .build();
+//                    taskPO.setCreateTime(TimeUtil.getNowForMysql());
+//                    taskPO.setCreateUserId(userId);
+//                    taskPO.setModifyTime(TimeUtil.getNowForMysql());
+//                    taskPO.setModifyUserId(userId);
+//                    taskPO.setModifyTime(TimeUtil.getNowForMysql());
+//                    taskPO.setIsDeleted("0");
+//                    taskList.add(taskPO);
+//                    // 将 xosc、xodr、osgb 全部上传到仿真结果路径
+//                    String scenarioOsc = scenePO.getScenarioOsc();
+//                    String[] splitXosc = scenarioOsc.split("/");
+//                    String xoscName = splitXosc[splitXosc.length - 1];
+//                    String[] xoscNameSplit = xoscName.split("\\.");
+//                    String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
+//                    String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
+//                    String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
+//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
+//                    MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
+//                    log.info("cacheManualProject() 已经将 xosc 上传到 minio 的结果文件目录:" + xoscPathOfMinio);
+//
+//                    String scenarioOdr = scenePO.getScenarioOdr();
+//                    String[] splitXodr = scenarioOdr.split("/");
+//                    String xodrName = splitXodr[splitXodr.length - 1];
+//                    String[] xodrNameSplit = xodrName.split("\\.");
+//                    String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
+//                    String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
+//                    String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
+//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
+//                    MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
+//                    log.info("cacheManualProject() 已经将 xodr 上传到 minio 的结果文件目录:" + xodrPathOfMinio);
+//
+//                    String scenarioOsgb = scenePO.getScenarioOsgb();
+//                    String[] splitOsgb = scenarioOsgb.split("/");
+//                    String osgbName = splitOsgb[splitOsgb.length - 1];
+//                    String[] osgbNameSplit = osgbName.split("\\.");
+//                    String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
+//                    String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
+//                    String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
+//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
+//                    MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
+//                    log.info("cacheManualProject() 已经将 osgb 上传到 minio 的结果文件目录:" + osgbPathOfMinio);
+//
+//                    // 组装 task 消息
+//                    TaskTO taskTO = TaskTO.builder()
+//                            .info(InfoTO.builder()
+//                                    .project_id(taskPO.getPId())
+//                                    .task_id(taskPO.getId())
+//                                    .task_path(taskPO.getRunResultFilePath())
+//                                    .default_time(videoTime)
+//                                    .build())
+//                            .scenario(ScenarioTO.builder()
+//                                    .scenario_osc(xoscPathOfMinio)
+//                                    .scenario_odr(xodrPathOfMinio)
+//                                    .scenario_osgb(osgbPathOfMinio)
+//                                    .build())
+//                            .vehicle(VehicleTO.builder()
+//                                    .model(ModelTO.builder()
+//                                            .model_label(vehiclePO.getModelLabel())
+//                                            .build())
+//                                    .dynamics(DynamicsTO.builder()
+//                                            .dynamics_maxspeed(vehiclePO.getMaxSpeed())
+//                                            .dynamics_enginepower(vehiclePO.getEnginePower())
+//                                            .dynamics_maxdecel(vehiclePO.getMaxDeceleration())
+//                                            .dynamics_maxsteering(vehiclePO.getMaxSteeringAngle())
+//                                            .dynamics_mass(vehiclePO.getMass())
+//                                            .dynamics_frontsurfaceeffective(vehiclePO.getFrontSurfaceEffective())
+//                                            .dynamics_airdragcoefficient(vehiclePO.getAirDragCoefficient())
+//                                            .dynamics_rollingresistance(vehiclePO.getRollingResistanceCoefficient())
+//                                            .dynamics_wheeldiameter(vehiclePO.getWheelDiameter())
+//                                            .dynamics_wheeldrive(vehiclePO.getWheelDrive())
+//                                            .dynamics_overallefficiency(vehiclePO.getOverallEfficiency())
+//                                            .dynamics_distfront(vehiclePO.getFrontDistance())
+//                                            .dynamics_distrear(vehiclePO.getRearDistance())
+//                                            .dynamics_distleft(vehiclePO.getLeftDistance())
+//                                            .dynamics_distright(vehiclePO.getRightDistance())
+//                                            .dynamics_distheight(vehiclePO.getHeightDistance())
+//                                            .dynamics_wheelbase(vehiclePO.getWheelbase())
+//                                            .build())
+//                                    .sensors(SensorsTO.builder()   // 根据 vehicleId 查询绑定的传感器列表
+//                                            .camera(cameraPOList)
+//                                            .OGT(ogtPOList)
+//                                            .build())
+//                                    .build())
+//                            .build();
+//
+//                    log.info("createTaskAndFixData() 项目 " + projectId + " 将对象转成 json 保存到临时目录等待资源分配后执行:" + taskTO);
+//                    FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskTO), projectPath + taskId + ".json");
+//                    log.info("createTaskAndFixData() 项目 " + projectId + " 将对象转成 json 保存到临时目录成功。");
+//                }
+//            }
+//            log.info("createTaskAndFixData() 项目 " + projectId + " 创建任务:" + taskList);
+//            taskManager.batchInsertTask(taskList);
+//            log.info("createTaskAndFixData() 项目 " + projectId + " 任务保存成功。");
+//        } else if ("2".equals(modelType)) {
+//            log.info("createTaskAndFixData() 项目 " + projectId + " 开始查询模型。");
+//
+//            VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
+//            List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
+//            List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
+//            // -------------------------------- 4 保存任务消息 --------------------------------
+//            log.info("createTaskAndFixData() 项目 " + projectId + " 开始保存任务消息。");
+//            List<TaskPO> taskList = new ArrayList<>();
+//            for (ScenePO scenePO : scenePOSet) {
+//                String sceneId = scenePO.getId();
+//                //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
+//                List<String> lastTargetIdList = null;
+//                if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
+//                    lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
+//                } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
+//                    lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
+//                }
+//                if (CollectionUtil.isEmpty(lastTargetIdList)) {
+//                    throw new RuntimeException("createTaskAndFixData() 项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
+//                }
+//                for (String lastTargetId : lastTargetIdList) {
+//                    String taskId = StringUtil.getRandomUUID();
+//                    // 保存任务信息
+//                    TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
+//                            .id(taskId)
+//                            .pId(projectId)
+//                            .sceneId(sceneId)
+//                            .lastTargetId(lastTargetId)
+//                            .sceneName(scenePO.getName())
+//                            .sceneType(scenePO.getType())
+//                            .runState(DictConstants.TASK_PENDING)
+//                            .runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId)
+//                            .build();
+//                    taskPO.setCreateTime(TimeUtil.getNowForMysql());
+//                    taskPO.setCreateUserId(userId);
+//                    taskPO.setModifyTime(TimeUtil.getNowForMysql());
+//                    taskPO.setModifyUserId(userId);
+//                    taskPO.setModifyTime(TimeUtil.getNowForMysql());
+//                    taskPO.setIsDeleted("0");
+//                    taskList.add(taskPO);
+//                    // 将 xosc、xodr、osgb 全部上传到仿真结果路径
+//                    String scenarioOsc = scenePO.getScenarioOsc();
+//                    String[] splitXosc = scenarioOsc.split("/");
+//                    String xoscName = splitXosc[splitXosc.length - 1];
+//                    String[] xoscNameSplit = xoscName.split("\\.");
+//                    String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
+//                    String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
+//                    String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
+//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
+//                    MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
+//                    log.info("cacheManualProject() 已经将 xosc 上传到 minio 的结果文件目录:" + xoscPathOfMinio);
+//
+//                    String scenarioOdr = scenePO.getScenarioOdr();
+//                    String[] splitXodr = scenarioOdr.split("/");
+//                    String xodrName = splitXodr[splitXodr.length - 1];
+//                    String[] xodrNameSplit = xodrName.split("\\.");
+//                    String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
+//                    String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
+//                    String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
+//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
+//                    MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
+//                    log.info("cacheManualProject() 已经将 xodr 上传到 minio 的结果文件目录:" + xodrPathOfMinio);
+//
+//                    String scenarioOsgb = scenePO.getScenarioOsgb();
+//                    String[] splitOsgb = scenarioOsgb.split("/");
+//                    String osgbName = splitOsgb[splitOsgb.length - 1];
+//                    String[] osgbNameSplit = osgbName.split("\\.");
+//                    String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
+//                    String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
+//                    String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
+//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
+//                    MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
+//                    log.info("cacheManualProject() 已经将 osgb 上传到 minio 的结果文件目录:" + osgbPathOfMinio);
+//
+//                    // 组装 task 消息
+//                    // carsim 不需要查询模型参数
+//                    TaskTO taskTO = TaskTO.builder()
+//                            .info(InfoTO.builder()
+//                                    .project_id(taskPO.getPId())
+//                                    .task_id(taskPO.getId())
+//                                    .task_path(taskPO.getRunResultFilePath())
+//                                    .default_time(videoTime)
+//                                    .build())
+//                            .scenario(ScenarioTO.builder()
+//                                    .scenario_osc(xoscPathOfMinio)
+//                                    .scenario_odr(xodrPathOfMinio)
+//                                    .scenario_osgb(osgbPathOfMinio)
+//                                    .build())
+//                            .vehicle(VehicleTO.builder()
+//                                    .model(ModelTO.builder()
+//                                            .model_label(vehiclePO.getModelLabel())
+//                                            .build())
+//                                    .dynamics(null)
+//                                    .sensors(SensorsTO.builder()   // 根据 vehicleId 查询绑定的传感器列表
+//                                            .camera(cameraPOList)
+//                                            .OGT(ogtPOList)
+//                                            .build())
+//                                    .build())
+//                            .build();
+//
+//                    log.info("createTaskAndFixData() 项目 " + projectId + " 将对象转成 json 保存到临时目录等待资源分配后执行:" + taskTO);
+//                    FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskTO), projectPath + taskId + ".json");
+//                    log.info("createTaskAndFixData() 项目 " + projectId + " 将对象转成 json 保存到临时目录成功。");
+//                }
+//            }
+//            log.info("createTaskAndFixData() 项目 " + projectId + " 创建任务:" + taskList);
+//            taskManager.batchInsertTask(taskList);
+//            log.info("createTaskAndFixData() 项目 " + projectId + " 任务保存成功。");
+//        }
+//
+//        //* -------------------------------- 4 开始排队 --------------------------------
+//        cacheProject(projectRecord);
+//
+//    }
+//
+//
+//    /**
+//     * 任务运行前首先判断用户是否拥有可分配资源
+//     *
+//     * @param projectRecord 项目启动消息
+//     */
+//    @SneakyThrows
+//    public void cacheProject(ConsumerRecord<String, String> projectRecord) {
+//        String initialProjectJson = projectRecord.value();
+//        log.info("cacheManualProject() 判断用户是否拥有可分配资源:" + initialProjectJson);
+//        //1 读取 kafka 的 project 信息
+//        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(initialProjectJson, ProjectMessageDTO.class);
+//        String projectId = projectMessageDTO.getProjectId();    // 手动执行项目 id 或 自动执行子项目 id
+//        long parallelism = projectMessageDTO.getParallelism();   // 项目并行度
+//        String projectType = projectMessageDTO.getType(); // 项目类型
+//        //2 根据 projectId 获取创建用户 id
+//        String userId;
+//        if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
+//            userId = manualProjectMapper.selectCreateUserById(projectId);
+//        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
+//            userId = autoSubProjectMapper.selectCreateUserById(projectId);
+//        } else {
+//            log.error("cacheManualProject() 项目类型错误:" + initialProjectJson);
+//            return;
+//        }
+//        if (StringUtil.isEmpty(userId)) {
+//            log.error("cacheManualProject() 未查询到项目创建人:" + initialProjectJson);
+//            return;
+//        }
+//        //3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
+//        UserPO userPO = userMapper.selectById(userId);
+//        log.info("cacheManualProject() 项目 " + projectId + " 的创建人为:" + userPO);
+//        String roleCode = userPO.getRoleCode();
+//        String useType = userPO.getUseType();
+//        ClusterPO clusterPO;
+//        if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
+//            log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
+//            PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
+//            run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
+//            return;
+//        } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
+//            clusterPO = clusterMapper.selectByUserId(userId);
+//            log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterPO);
+//        } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
+//            if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) {   //3-3 普通子账户,根据自己的独占节点排队
+//                clusterPO = clusterMapper.selectByUserId(userId);
+//                log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通独占子账户(自己的集群),集群为:" + clusterPO);
+//            } else {    //3-4 共享子账户,根据父账户的共享节点排队
+//                String parentUserId = userPO.getCreateUserId();
+//                clusterPO = clusterMapper.selectByUserId(parentUserId);
+//                log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通共享子账户(父账户的集群),集群为:" + clusterPO);
+//            }
+//        } else {
+//            log.error("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
+//            return;
+//        }
+//        // 获取拥有的节点数量,即仿真软件证书数量
+//        String clusterId = clusterPO.getId();
+//        int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
+//        // 获取该集群中正在运行的项目,如果没有则立即执行
+//        PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
+//        // 获取正在运行的项目的并行度总和
+//        int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
+//        // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
+//        if (currentParallelismSum + parallelism <= simulationLicenseNumber) {
+//            run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
+//        } else {
+//            wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
+//        }
+//    }
+//
+//    /**
+//     * @param projectMessageDTO 初始接收到的项目启动信息
+//     * @param clusterId         集群 id
+//     * @param projectRunningKey projectRunningKey
+//     * @param projectWaitingKey projectWaitingKey
+//     */
+//    public void run(ProjectMessageDTO projectMessageDTO, String clusterId, String projectRunningKey, String projectWaitingKey) {
+//
+//        String projectId = projectMessageDTO.getProjectId();
+//        int parallelism = projectMessageDTO.getParallelism();  // 期望并行度
+//        //1 获取集群剩余可用并行度
+//        int restParallelism = projectUtil.getRestParallelism();
+//        //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
+//        if (restParallelism > 0L) {
+//            log.info("run() 集群 " + clusterId + " 执行项目 " + projectId);
+//            // 设置实际的并行度
+//            projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism));   // 设置实际的并行度
+//            parseProject(projectMessageDTO, projectRunningKey);
+//        } else {
+//            log.info("run() 服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
+//            wait(projectWaitingKey, projectMessageDTO);
+//        }
+//    }
+//
+//    /**
+//     * @param projectWaitingKey 项目等待 key
+//     * @param projectMessageDTO 项目信息
+//     */
+//    @SneakyThrows
+//    public void wait(String projectWaitingKey, ProjectMessageDTO projectMessageDTO) {
+//        stringRedisTemplate.opsForValue().set(projectWaitingKey, JsonUtil.beanToJson(projectMessageDTO));
+//    }
+//
+//
+//    /**
+//     * @param projectMessageDTO 初始接收到的项目启动信息
+//     * @param projectRunningKey projectRunningKey
+//     */
+//    @SneakyThrows
+//    public void parseProject(ProjectMessageDTO projectMessageDTO, String projectRunningKey) {
+//        String projectId = projectMessageDTO.getProjectId();    // 项目 id
+//        String modelType = projectMessageDTO.getModelType();
+//        String vehicleConfigId = projectMessageDTO.getVehicleConfigId();
+//        ProjectPO projectPO = projectUtil.getProjectByProjectId(projectId);
+//        log.info("parseProject() 项目 " + projectId + " 信息为:" + projectPO);
+//        String isChoiceGpu = projectPO.getIsChoiceGpu();
+//        // 项目类型
+//        int currentParallelism = projectMessageDTO.getCurrentParallelism();   // 当前并行度
+//
+//        // 场景测试包 id
+//        // 结果视频的时长
+//        // 模型配置 id
+//        String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
+//        String projectPath = linuxTempPath + "project/" + projectId + "/";
+//        // -------------------------------- 0 准备 --------------------------------
+////        projectService.prepare(projectMessageDTO, projectWaitingKey, projectRunningKey);
+//        // -------------------------------- 1 获取任务 json 列表 --------------------------------
+//        List<String> taskJsonList = FileUtil.listAbsolutePathByTypeAndLength(projectPath, "json", 37);
+//        int taskTotal = taskJsonList.size();
+//        projectMessageDTO.setTaskTotal(taskTotal);
+//        projectMessageDTO.setTaskCompleted(0);
+//
+//        // 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
+//        Map<String, Integer> nodeMap;
+//        if (currentParallelism < taskTotal) {
+//            nodeMap = projectUtil.getNodeMapToUse(currentParallelism);
+//        } else {
+//            nodeMap = projectUtil.getNodeMapToUse(taskTotal);
+//        }
+//        // 将指定 node 的并行度减少
+//        nodeMap.keySet().forEach(nodeName -> {
+//            int parallelismToUse = nodeMap.get(nodeName);
+//            String restParallelismKey = "node:" + nodeName + ":parallelism";
+//            int restParallelism = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue()
+//                    .get(restParallelismKey))); // 剩余可用并行度
+//            stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism - parallelismToUse) + "");
+//        });
+//        // 重新设置实际使用的并行度并保存到 redis
+//        int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
+//        projectMessageDTO.setCurrentParallelism(realCurrentParallelism);
+//        log.info("parseProject() 项目 " + projectId + " 运行在:" + nodeMap);
+//        stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
+//        //* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
+//        String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
+//        // -------------------------------- 4 发送任务消息 --------------------------------
+//        List<NodeTO> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
+//        log.info("parseProject() 项目 " + projectId + " 运行在:" + nodeListToCount);
+//        int messageNumber = 0;
+//        ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1);   // 创建主题
+//        TimeUnit.SECONDS.sleep(6);
+//        // 需要即时启动的任务(并行度的大小)
+//        CopyOnWriteArrayList<String> yamlListToRun = new CopyOnWriteArrayList<>();
+//        for (String taskJsonPath : taskJsonList) {
+//            String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
+//            //TODO 设置任务重试次数为 0,方便任务进行最大3次的重试。
+//            String taskRetryKey = projectRunningKey + ":task:" + taskId + ":retry";
+//            stringRedisTemplate.opsForValue().set(taskRetryKey, "0");
+//            // 保存运行中的任务信息
+//            String taskMessageKey = projectRunningKey + ":task:" + taskId + ":message";
+//            String taskJson = FileUtil.read(taskJsonPath);
+//            stringRedisTemplate.opsForValue().set(taskMessageKey, taskJson);
+//
+//            //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
+//            SendResult<String, String> stringStringSendResult = kafkaTemplate.send(projectId, messageNumber % currentParallelism, taskId, taskJson).get();
+//            RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
+//            String topic = recordMetadata.topic();  // 消息发送到的topic
+//            int partition = recordMetadata.partition(); // 消息发送到的分区
+//            long offset = recordMetadata.offset();  // 消息在分区内的offset
+//            log.info("parseProject() 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:"
+//                    + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
+//            //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
+//            // 选一个 count 最少的 node
+//            String currentNodeName = "";
+//            NodeTO currentNodeTO = null;
+//            int currentCount = Integer.MAX_VALUE;
+//            log.info("parseProject() 各节点已经预定的任务个数为:" + nodeListToCount);
+//            for (NodeTO nodeTO : nodeListToCount) {
+//                int tempCount = nodeTO.getCount();
+//                String tempNodeName = nodeTO.getNodeName();
+//                if (tempCount < currentCount) {
+//                    currentCount = tempCount;
+//                    currentNodeName = tempNodeName;
+//                    currentNodeTO = nodeTO;
+//                }
+//            }
+//            currentNodeTO.setCount(currentNodeTO.getCount() + 1);
+//
+//            log.info("parseProject() 项目 " + projectId + " 准备创建 yaml:是否使用 gpu " + isChoiceGpu + ",当前节点名称为:" + currentNodeName + ",当前节点已创建 yaml 个数为:" + currentCount);
+//            String tempYaml = projectManager.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu);
+//            if (currentCount == 0) {
+//                log.info("parseProject() 加入到启动列表 " + tempYaml);
+//                yamlListToRun.add(tempYaml);
+//            }
+//            messageNumber++;
+//        }
+//        TimeUnit.SECONDS.sleep(6);
+//        log.info("parseProject() 项目 " + projectId + " 共发送了 " + messageNumber + " 条消息。");
+//        log.info("parseProject() 项目 " + projectId + " 准备首先启动 " + yamlListToRun);
+//        for (String yaml : yamlListToRun) {
+//            projectUtil.createPod2(yaml);
+//        }
+//        log.info("parseProject() 项目 " + projectId + " 已经启动 " + yamlListToRun);
+//    }
+//
+//    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")
+//    @SneakyThrows
+//    public void stopProject(ConsumerRecord<String, String> stopRecord) {
+//        log.info("stopProject() 接收到的项目终止消息为:" + stopRecord);
+//        //1 读取 kafka 的项目停止信息
+//        /*
+//            {
+//                "projectId": "sadfasdfs",	// 项目 id
+//                "type": "1",	// 项目类型
+//            }
+//         */
+//
+//        String json = stopRecord.value();
+//        ObjectMapper objectMapper = new ObjectMapper();
+//        JsonNode jsonNode = objectMapper.readTree(json);
+//        String projectId = jsonNode.path("projectId").asText();
+//        String type = jsonNode.path("type").asText();
+//        projectService.stopProject(projectId, type);
+//    }
+//
+//
+//}