root 2 年之前
父节点
当前提交
bd830767cd

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/resource/TaskLock.java

@@ -37,7 +37,7 @@ public class TaskLock {
             ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey()), ProjectMessageDTO.class);
             int taskTotal = projectMessageDTO.getTaskTotal();
             int taskCompleted = projectMessageDTO.getTaskCompleted();
-            log.info("isProjectCompleted() 项目 " + projectId + " 完成进度为:" + (taskCompleted + 1) + "/" + taskTotal);
+            log.info("complete() 项目 " + projectId + " 完成进度为:" + (taskCompleted + 1) + "/" + taskTotal);
             if (taskCompleted + 1 == taskTotal) {
                 result = true;
             } else {    // 项目没有完成

+ 16 - 12
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/util/ProjectUtil.java

@@ -158,16 +158,15 @@ public class ProjectUtil {
             final String replace = read.replace("cpu-order", "\"" + cpuOrderString + "\"");
             FileUtil.writeStringToLocalFile(replace, absolutePath);
             // 创建 pod
-            createPod3(yamlPathCacheKey);
+            createPod3(yamlPathCacheKey, cpuOrderString);
             log.info("createNextPod3() 创建项目 " + projectId + " 在节点 " + nodeName + " 的下一个 pod,使用 cpu 编号为 " + cpuOrderString);
         }
     }
 
-
     /**
      * @param redisKey yaml 地址的缓存 key
      */
-    public void createPod3(String redisKey) {
+    public void createPodBegin(String redisKey) {
         final String podYamlPath = stringRedisTemplate.opsForValue().get(redisKey);
         if (podYamlPath == null) {
             throw new RuntimeException("createPod3() 根据缓存 key 获取 yaml 地址为 null:" + redisKey);
@@ -180,18 +179,23 @@ public class ProjectUtil {
     }
 
 
-    public String getProjectTypeByProjectId(String projectId) {
-        String projectType = null;
-        ProjectPO manualProjectPO = manualProjectMapper.selectById(projectId);
-        ProjectPO autoSubProjectPO = autoSubProjectMapper.selectById(projectId);
-        if (manualProjectPO != null) {
-            projectType = DictConstants.PROJECT_TYPE_MANUAL;
-        } else if (autoSubProjectPO != null) {
-            projectType = DictConstants.PROJECT_TYPE_AUTO_SUB;
+    /**
+     * @param redisKey yaml 地址的缓存 key
+     */
+    public void createPod3(String redisKey, String cpuOrderString) {
+        final String podYamlPath = stringRedisTemplate.opsForValue().get(redisKey);
+        if (podYamlPath == null) {
+            throw new RuntimeException("createPod3() 根据缓存 key 获取 yaml 地址为 null:" + redisKey);
         }
-        return projectType;
+        stringRedisTemplate.delete(redisKey);
+        String nodeName = new File(podYamlPath).getName().split("#")[0];
+        String podName = podYamlPath.split("#")[1].split("\\.")[0];
+        stringRedisTemplate.opsForValue().set("pod:" + podName + ":node", nodeName);    // 将 pod 运行在哪个 node 上记录到 redis
+        stringRedisTemplate.opsForValue().set("pod:" + podName + ":cpu", cpuOrderString);    // 将 pod 运行在哪个 node 上记录到 redis
+        new Thread(() -> KubernetesUtil.applyYaml(hostname, username, password, podYamlPath), "create-" + podName).start();
     }
 
+
     public ProjectPO getProjectByProjectId(String projectId) {
         ProjectPO manualProjectPO = manualProjectMapper.selectById(projectId);
         ProjectPO autoSubProjectPO = autoSubProjectMapper.selectById(projectId);

+ 2 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/manager/TaskManager.java

@@ -173,6 +173,8 @@ public class TaskManager {
                     KubernetesUtil.deletePod2(apiClient, kubernetesConfiguration.getNamespace(), podName);
                     projectUtil.addOneParallelismToNode(nodeName);
                 }
+                RedisUtil.deleteByPrefix(stringRedisTemplate,redisPrefix.getTaskMessageKey());
+                RedisUtil.deleteByPrefix(stringRedisTemplate,redisPrefix.getTaskPodKey());
             } catch (Exception exception) {
                 log.info("isCompleted() 报错。", exception);
             }

+ 2 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/mapper/TaskMapper.java

@@ -4,6 +4,7 @@ package com.css.simulation.resource.scheduler.dao.mapper;
 import com.css.simulation.resource.scheduler.dao.entity.TaskPO;
 import org.apache.ibatis.annotations.*;
 import org.apache.ibatis.type.JdbcType;
+import org.springframework.context.annotation.Scope;
 
 import java.sql.Timestamp;
 import java.util.List;
@@ -12,6 +13,7 @@ import java.util.List;
  * simulation_manual_project_task
  */
 @Mapper
+@Scope
 public interface TaskMapper {
 
     @Results(id = "task", value = {

+ 6 - 9
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/consumer/ProjectConsumer.java

@@ -522,9 +522,6 @@ public class ProjectConsumer {
         CopyOnWriteArrayList<String> yamlToRunRedisKeyList = new CopyOnWriteArrayList<>();
         for (String taskJsonPath : taskJsonList) {
             String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
-            //TODO 设置任务重试次数为 0,方便任务进行最大3次的重试。
-            String taskRetryKey = projectRunningKey + ":task:" + taskId + ":retry";
-            stringRedisTemplate.opsForValue().set(taskRetryKey, "0");
             // 保存运行中的任务信息
             String taskMessageKey = projectRunningKey + ":task:" + taskId + ":message";
             String taskJson = FileUtil.read(taskJsonPath);
@@ -578,7 +575,7 @@ public class ProjectConsumer {
         TimeUnit.SECONDS.sleep(6);
         log.info("parseProject() 项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
         for (String redisKey : yamlToRunRedisKeyList) {
-            projectUtil.createPod3(redisKey);
+            projectUtil.createPodBegin(redisKey);
         }
         log.info("parseProject() 项目 " + projectId + " 已经启动 " + yamlToRunRedisKeyList);
     }
@@ -614,11 +611,11 @@ public class ProjectConsumer {
         // 将项目状态修改为终止中
         String projectId = jsonNode.path("projectId").asText();
         String type = jsonNode.path("type").asText();
-        if (DictConstants.PROJECT_TYPE_MANUAL.equals(type)) {
-            manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATING, TimeUtil.getNowForMysql());
-        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(type)) {
-            autoSubProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATING, TimeUtil.getNowForMysql());
-        }
+//        if (DictConstants.PROJECT_TYPE_MANUAL.equals(type)) {
+//            manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATING, TimeUtil.getNowForMysql());
+//        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(type)) {
+//            autoSubProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATING, TimeUtil.getNowForMysql());
+//        }
         projectService.stopProject(projectId, type);
     }
 

+ 0 - 617
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/consumer/ProjectConsumerBackup.java

@@ -1,617 +0,0 @@
-//package com.css.simulation.resource.scheduler.consumer;
-//
-//
-//import api.common.pojo.constants.DictConstants;
-//import api.common.pojo.dto.ProjectMessageDTO;
-//import api.common.util.*;
-//import com.css.simulation.resource.scheduler.manager.ProjectManager;
-//import com.css.simulation.resource.scheduler.manager.TaskManager;
-//import com.css.simulation.resource.scheduler.mapper.*;
-//import com.css.simulation.resource.scheduler.pojo.po.*;
-//import com.css.simulation.resource.scheduler.pojo.to.*;
-//import com.css.simulation.resource.scheduler.service.ProjectService;
-//import com.css.simulation.resource.scheduler.util.ApacheKafkaUtil;
-//import com.css.simulation.resource.scheduler.util.MinioUtil;
-//import com.css.simulation.resource.scheduler.util.ProjectUtil;
-//import com.fasterxml.jackson.databind.JsonNode;
-//import com.fasterxml.jackson.databind.ObjectMapper;
-//import io.minio.MinioClient;
-//import lombok.SneakyThrows;
-//import lombok.extern.slf4j.Slf4j;
-//import org.apache.kafka.clients.admin.Admin;
-//import org.apache.kafka.clients.consumer.ConsumerRecord;
-//import org.apache.kafka.clients.producer.RecordMetadata;
-//import org.springframework.beans.factory.annotation.Value;
-//import org.springframework.data.redis.core.StringRedisTemplate;
-//import org.springframework.kafka.annotation.KafkaListener;
-//import org.springframework.kafka.core.KafkaTemplate;
-//import org.springframework.kafka.support.SendResult;
-//import org.springframework.stereotype.Component;
-//
-//import javax.annotation.Resource;
-//import java.util.*;
-//import java.util.concurrent.CopyOnWriteArrayList;
-//import java.util.concurrent.TimeUnit;
-//
-//@Component
-//@Slf4j
-//public class ProjectConsumerBackup {
-//    @Value("${scheduler.linux-path.temp}")
-//    private String linuxTempPath;
-//    @Value("${scheduler.minio-path.project-result}")
-//    private String projectResultPathOfMinio;
-//    @Value("${minio.bucket-name}")
-//    private String bucketName;
-//
-//    // -------------------------------- Comment --------------------------------
-//    @Resource
-//    private MinioClient minioClient;
-//    @Resource
-//    private StringRedisTemplate stringRedisTemplate;
-//    @Resource
-//    private ManualProjectMapper manualProjectMapper;
-//    @Resource
-//    private AutoSubProjectMapper autoSubProjectMapper;
-//    @Resource
-//    private VehicleMapper vehicleMapper;
-//    @Resource
-//    private SensorCameraMapper sensorCameraMapper;
-//    @Resource
-//    private SensorOgtMapper sensorOgtMapper;
-//    @Resource
-//    private AlgorithmMapper algorithmMapper;
-//    @Resource
-//    private UserMapper userMapper;
-//    @Resource
-//    private ClusterMapper clusterMapper;
-//    @Resource
-//    private ProjectService projectService;
-//    @Resource
-//    private ProjectUtil projectUtil;
-//    @Resource
-//    private IndexMapper indexMapper;
-//    @Resource
-//    private TaskMapper taskMapper;
-//    @Resource
-//    private TaskManager taskManager;
-//    @Resource
-//    private ProjectManager projectManager;
-//    @Resource
-//    private KafkaTemplate<String, String> kafkaTemplate;
-//    @Resource(name = "myKafkaAdmin")
-//    private Admin kafkaAdminClient;
-//
-//    /**
-//     * 接收到运行信息立即复制一份数据作为运行数据
-//     *
-//     * @param projectRecord 项目启动消息
-//     */
-//    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.start-topic}")
-//    @SneakyThrows
-//    public void acceptMessage(ConsumerRecord<String, String> projectRecord) {
-//        String initialProjectJson = projectRecord.value();
-//        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(initialProjectJson, ProjectMessageDTO.class);
-//        String projectId = projectMessageDTO.getProjectId();        // 手动执行项目 id 或 自动执行子项目 id
-//        new Thread(() -> createTaskAndFixData(projectRecord), "fix-" + projectId).start();
-//    }
-//
-//
-//    @SneakyThrows
-//    public void createTaskAndFixData(ConsumerRecord<String, String> projectRecord) {
-//        //* -------------------------------- 0 读取消息,创建临时目录 --------------------------------
-//        String initialProjectJson = projectRecord.value();
-//        log.info("createTaskAndFixData() 接收到项目开始消息为:" + initialProjectJson);
-//        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(initialProjectJson, ProjectMessageDTO.class);
-//        String projectId = projectMessageDTO.getProjectId();                // 手动执行项目 id 或 自动执行子项目 id
-//        String modelType = projectMessageDTO.getModelType();                // 模型类型,1 动力学模型 2 carsim模型
-//        String packageId = projectMessageDTO.getScenePackageId();           // 场景测试包 id
-//        String vehicleConfigId = projectMessageDTO.getVehicleConfigId();    // 模型配置 id
-//        String algorithmId = projectMessageDTO.getAlgorithmId();            // 模型配置 id
-//        long videoTime = projectMessageDTO.getMaxSimulationTime();          // 结果视频的时长
-//        String projectType = projectMessageDTO.getType();                   // 项目类型
-//        String userId = "";  // 用户 id
-//        if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-//            userId = manualProjectMapper.selectCreateUserById(projectId);
-//        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-//            userId = autoSubProjectMapper.selectCreateUserById(projectId);
-//        }
-//        String projectPath = linuxTempPath + "project/" + projectId + "/";
-//        FileUtil.mkdir(projectPath);
-//        //5 将该 project 下所有旧的指标得分删除。
-//        taskMapper.deleteByProject(projectId);
-//        indexMapper.deleteFirstTargetScoreByProjectId(projectId);
-//        indexMapper.deleteLastTargetScoreByProjectId(projectId);
-//        // -------------------------------- 1 查询场景 --------------------------------
-//        log.info("createTaskAndFixData() 项目 " + projectId + " 开始查询场景。");
-//        //根据场景测试包 packageId,拿到场景集合(包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
-//        List<ScenePO> scenePOList = projectService.getSceneList(projectId, packageId);
-//        int taskTotal = scenePOList.size();
-//        projectMessageDTO.setTaskTotal(taskTotal);
-//        projectMessageDTO.setTaskCompleted(0);
-//        //去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
-//        Set<ScenePO> scenePOSet = new HashSet<>(scenePOList);
-//        log.info("createTaskAndFixData() 项目 " + projectId + " 场景包括:" + scenePOSet);
-//        // -------------------------------- 2 算法导入 --------------------------------
-//        log.info("createTaskAndFixData() 项目 " + projectId + " 开始算法导入。");
-//        String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
-//        log.info("createTaskAndFixData() 项目 " + projectId + " 算法已导入:" + algorithmDockerImage);
-//        // -------------------------------- 3 查询模型 --------------------------------
-//        if ("1".equals(modelType)) {
-//            log.info("createTaskAndFixData() 项目 " + projectId + " 开始查询模型。");
-//            //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
-//            VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
-//            List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
-//            List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
-//            // -------------------------------- 4 保存任务消息 --------------------------------
-//            log.info("createTaskAndFixData() 项目 " + projectId + " 开始保存任务消息。");
-//            List<TaskPO> taskList = new ArrayList<>();
-//            for (ScenePO scenePO : scenePOSet) {
-//                String sceneId = scenePO.getId();
-//                //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
-//                List<String> lastTargetIdList = null;
-//                if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-//                    lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
-//                } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-//                    lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
-//                }
-//                if (CollectionUtil.isEmpty(lastTargetIdList)) {
-//                    throw new RuntimeException("createTaskAndFixData() 项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
-//                }
-//                for (String lastTargetId : lastTargetIdList) {
-//                    String taskId = StringUtil.getRandomUUID();
-//                    // 保存任务信息
-//                    TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
-//                            .id(taskId)
-//                            .pId(projectId)
-//                            .sceneId(sceneId)
-//                            .lastTargetId(lastTargetId)
-//                            .sceneName(scenePO.getName())
-//                            .sceneType(scenePO.getType())
-//                            .runState(DictConstants.TASK_PENDING)
-//                            .runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId)
-//                            .build();
-//                    taskPO.setCreateTime(TimeUtil.getNowForMysql());
-//                    taskPO.setCreateUserId(userId);
-//                    taskPO.setModifyTime(TimeUtil.getNowForMysql());
-//                    taskPO.setModifyUserId(userId);
-//                    taskPO.setModifyTime(TimeUtil.getNowForMysql());
-//                    taskPO.setIsDeleted("0");
-//                    taskList.add(taskPO);
-//                    // 将 xosc、xodr、osgb 全部上传到仿真结果路径
-//                    String scenarioOsc = scenePO.getScenarioOsc();
-//                    String[] splitXosc = scenarioOsc.split("/");
-//                    String xoscName = splitXosc[splitXosc.length - 1];
-//                    String[] xoscNameSplit = xoscName.split("\\.");
-//                    String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
-//                    String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
-//                    String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
-//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
-//                    MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
-//                    log.info("cacheManualProject() 已经将 xosc 上传到 minio 的结果文件目录:" + xoscPathOfMinio);
-//
-//                    String scenarioOdr = scenePO.getScenarioOdr();
-//                    String[] splitXodr = scenarioOdr.split("/");
-//                    String xodrName = splitXodr[splitXodr.length - 1];
-//                    String[] xodrNameSplit = xodrName.split("\\.");
-//                    String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
-//                    String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
-//                    String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
-//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
-//                    MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
-//                    log.info("cacheManualProject() 已经将 xodr 上传到 minio 的结果文件目录:" + xodrPathOfMinio);
-//
-//                    String scenarioOsgb = scenePO.getScenarioOsgb();
-//                    String[] splitOsgb = scenarioOsgb.split("/");
-//                    String osgbName = splitOsgb[splitOsgb.length - 1];
-//                    String[] osgbNameSplit = osgbName.split("\\.");
-//                    String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
-//                    String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
-//                    String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
-//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
-//                    MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
-//                    log.info("cacheManualProject() 已经将 osgb 上传到 minio 的结果文件目录:" + osgbPathOfMinio);
-//
-//                    // 组装 task 消息
-//                    TaskTO taskTO = TaskTO.builder()
-//                            .info(InfoTO.builder()
-//                                    .project_id(taskPO.getPId())
-//                                    .task_id(taskPO.getId())
-//                                    .task_path(taskPO.getRunResultFilePath())
-//                                    .default_time(videoTime)
-//                                    .build())
-//                            .scenario(ScenarioTO.builder()
-//                                    .scenario_osc(xoscPathOfMinio)
-//                                    .scenario_odr(xodrPathOfMinio)
-//                                    .scenario_osgb(osgbPathOfMinio)
-//                                    .build())
-//                            .vehicle(VehicleTO.builder()
-//                                    .model(ModelTO.builder()
-//                                            .model_label(vehiclePO.getModelLabel())
-//                                            .build())
-//                                    .dynamics(DynamicsTO.builder()
-//                                            .dynamics_maxspeed(vehiclePO.getMaxSpeed())
-//                                            .dynamics_enginepower(vehiclePO.getEnginePower())
-//                                            .dynamics_maxdecel(vehiclePO.getMaxDeceleration())
-//                                            .dynamics_maxsteering(vehiclePO.getMaxSteeringAngle())
-//                                            .dynamics_mass(vehiclePO.getMass())
-//                                            .dynamics_frontsurfaceeffective(vehiclePO.getFrontSurfaceEffective())
-//                                            .dynamics_airdragcoefficient(vehiclePO.getAirDragCoefficient())
-//                                            .dynamics_rollingresistance(vehiclePO.getRollingResistanceCoefficient())
-//                                            .dynamics_wheeldiameter(vehiclePO.getWheelDiameter())
-//                                            .dynamics_wheeldrive(vehiclePO.getWheelDrive())
-//                                            .dynamics_overallefficiency(vehiclePO.getOverallEfficiency())
-//                                            .dynamics_distfront(vehiclePO.getFrontDistance())
-//                                            .dynamics_distrear(vehiclePO.getRearDistance())
-//                                            .dynamics_distleft(vehiclePO.getLeftDistance())
-//                                            .dynamics_distright(vehiclePO.getRightDistance())
-//                                            .dynamics_distheight(vehiclePO.getHeightDistance())
-//                                            .dynamics_wheelbase(vehiclePO.getWheelbase())
-//                                            .build())
-//                                    .sensors(SensorsTO.builder()   // 根据 vehicleId 查询绑定的传感器列表
-//                                            .camera(cameraPOList)
-//                                            .OGT(ogtPOList)
-//                                            .build())
-//                                    .build())
-//                            .build();
-//
-//                    log.info("createTaskAndFixData() 项目 " + projectId + " 将对象转成 json 保存到临时目录等待资源分配后执行:" + taskTO);
-//                    FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskTO), projectPath + taskId + ".json");
-//                    log.info("createTaskAndFixData() 项目 " + projectId + " 将对象转成 json 保存到临时目录成功。");
-//                }
-//            }
-//            log.info("createTaskAndFixData() 项目 " + projectId + " 创建任务:" + taskList);
-//            taskManager.batchInsertTask(taskList);
-//            log.info("createTaskAndFixData() 项目 " + projectId + " 任务保存成功。");
-//        } else if ("2".equals(modelType)) {
-//            log.info("createTaskAndFixData() 项目 " + projectId + " 开始查询模型。");
-//
-//            VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
-//            List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
-//            List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
-//            // -------------------------------- 4 保存任务消息 --------------------------------
-//            log.info("createTaskAndFixData() 项目 " + projectId + " 开始保存任务消息。");
-//            List<TaskPO> taskList = new ArrayList<>();
-//            for (ScenePO scenePO : scenePOSet) {
-//                String sceneId = scenePO.getId();
-//                //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
-//                List<String> lastTargetIdList = null;
-//                if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-//                    lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
-//                } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-//                    lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
-//                }
-//                if (CollectionUtil.isEmpty(lastTargetIdList)) {
-//                    throw new RuntimeException("createTaskAndFixData() 项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
-//                }
-//                for (String lastTargetId : lastTargetIdList) {
-//                    String taskId = StringUtil.getRandomUUID();
-//                    // 保存任务信息
-//                    TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
-//                            .id(taskId)
-//                            .pId(projectId)
-//                            .sceneId(sceneId)
-//                            .lastTargetId(lastTargetId)
-//                            .sceneName(scenePO.getName())
-//                            .sceneType(scenePO.getType())
-//                            .runState(DictConstants.TASK_PENDING)
-//                            .runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId)
-//                            .build();
-//                    taskPO.setCreateTime(TimeUtil.getNowForMysql());
-//                    taskPO.setCreateUserId(userId);
-//                    taskPO.setModifyTime(TimeUtil.getNowForMysql());
-//                    taskPO.setModifyUserId(userId);
-//                    taskPO.setModifyTime(TimeUtil.getNowForMysql());
-//                    taskPO.setIsDeleted("0");
-//                    taskList.add(taskPO);
-//                    // 将 xosc、xodr、osgb 全部上传到仿真结果路径
-//                    String scenarioOsc = scenePO.getScenarioOsc();
-//                    String[] splitXosc = scenarioOsc.split("/");
-//                    String xoscName = splitXosc[splitXosc.length - 1];
-//                    String[] xoscNameSplit = xoscName.split("\\.");
-//                    String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
-//                    String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
-//                    String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
-//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
-//                    MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
-//                    log.info("cacheManualProject() 已经将 xosc 上传到 minio 的结果文件目录:" + xoscPathOfMinio);
-//
-//                    String scenarioOdr = scenePO.getScenarioOdr();
-//                    String[] splitXodr = scenarioOdr.split("/");
-//                    String xodrName = splitXodr[splitXodr.length - 1];
-//                    String[] xodrNameSplit = xodrName.split("\\.");
-//                    String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
-//                    String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
-//                    String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
-//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
-//                    MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
-//                    log.info("cacheManualProject() 已经将 xodr 上传到 minio 的结果文件目录:" + xodrPathOfMinio);
-//
-//                    String scenarioOsgb = scenePO.getScenarioOsgb();
-//                    String[] splitOsgb = scenarioOsgb.split("/");
-//                    String osgbName = splitOsgb[splitOsgb.length - 1];
-//                    String[] osgbNameSplit = osgbName.split("\\.");
-//                    String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
-//                    String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
-//                    String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
-//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
-//                    MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
-//                    log.info("cacheManualProject() 已经将 osgb 上传到 minio 的结果文件目录:" + osgbPathOfMinio);
-//
-//                    // 组装 task 消息
-//                    // carsim 不需要查询模型参数
-//                    TaskTO taskTO = TaskTO.builder()
-//                            .info(InfoTO.builder()
-//                                    .project_id(taskPO.getPId())
-//                                    .task_id(taskPO.getId())
-//                                    .task_path(taskPO.getRunResultFilePath())
-//                                    .default_time(videoTime)
-//                                    .build())
-//                            .scenario(ScenarioTO.builder()
-//                                    .scenario_osc(xoscPathOfMinio)
-//                                    .scenario_odr(xodrPathOfMinio)
-//                                    .scenario_osgb(osgbPathOfMinio)
-//                                    .build())
-//                            .vehicle(VehicleTO.builder()
-//                                    .model(ModelTO.builder()
-//                                            .model_label(vehiclePO.getModelLabel())
-//                                            .build())
-//                                    .dynamics(null)
-//                                    .sensors(SensorsTO.builder()   // 根据 vehicleId 查询绑定的传感器列表
-//                                            .camera(cameraPOList)
-//                                            .OGT(ogtPOList)
-//                                            .build())
-//                                    .build())
-//                            .build();
-//
-//                    log.info("createTaskAndFixData() 项目 " + projectId + " 将对象转成 json 保存到临时目录等待资源分配后执行:" + taskTO);
-//                    FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskTO), projectPath + taskId + ".json");
-//                    log.info("createTaskAndFixData() 项目 " + projectId + " 将对象转成 json 保存到临时目录成功。");
-//                }
-//            }
-//            log.info("createTaskAndFixData() 项目 " + projectId + " 创建任务:" + taskList);
-//            taskManager.batchInsertTask(taskList);
-//            log.info("createTaskAndFixData() 项目 " + projectId + " 任务保存成功。");
-//        }
-//
-//        //* -------------------------------- 4 开始排队 --------------------------------
-//        cacheProject(projectRecord);
-//
-//    }
-//
-//
-//    /**
-//     * 任务运行前首先判断用户是否拥有可分配资源
-//     *
-//     * @param projectRecord 项目启动消息
-//     */
-//    @SneakyThrows
-//    public void cacheProject(ConsumerRecord<String, String> projectRecord) {
-//        String initialProjectJson = projectRecord.value();
-//        log.info("cacheManualProject() 判断用户是否拥有可分配资源:" + initialProjectJson);
-//        //1 读取 kafka 的 project 信息
-//        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(initialProjectJson, ProjectMessageDTO.class);
-//        String projectId = projectMessageDTO.getProjectId();    // 手动执行项目 id 或 自动执行子项目 id
-//        long parallelism = projectMessageDTO.getParallelism();   // 项目并行度
-//        String projectType = projectMessageDTO.getType(); // 项目类型
-//        //2 根据 projectId 获取创建用户 id
-//        String userId;
-//        if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-//            userId = manualProjectMapper.selectCreateUserById(projectId);
-//        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-//            userId = autoSubProjectMapper.selectCreateUserById(projectId);
-//        } else {
-//            log.error("cacheManualProject() 项目类型错误:" + initialProjectJson);
-//            return;
-//        }
-//        if (StringUtil.isEmpty(userId)) {
-//            log.error("cacheManualProject() 未查询到项目创建人:" + initialProjectJson);
-//            return;
-//        }
-//        //3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
-//        UserPO userPO = userMapper.selectById(userId);
-//        log.info("cacheManualProject() 项目 " + projectId + " 的创建人为:" + userPO);
-//        String roleCode = userPO.getRoleCode();
-//        String useType = userPO.getUseType();
-//        ClusterPO clusterPO;
-//        if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
-//            log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
-//            PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
-//            run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
-//            return;
-//        } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
-//            clusterPO = clusterMapper.selectByUserId(userId);
-//            log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterPO);
-//        } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
-//            if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) {   //3-3 普通子账户,根据自己的独占节点排队
-//                clusterPO = clusterMapper.selectByUserId(userId);
-//                log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通独占子账户(自己的集群),集群为:" + clusterPO);
-//            } else {    //3-4 共享子账户,根据父账户的共享节点排队
-//                String parentUserId = userPO.getCreateUserId();
-//                clusterPO = clusterMapper.selectByUserId(parentUserId);
-//                log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通共享子账户(父账户的集群),集群为:" + clusterPO);
-//            }
-//        } else {
-//            log.error("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
-//            return;
-//        }
-//        // 获取拥有的节点数量,即仿真软件证书数量
-//        String clusterId = clusterPO.getId();
-//        int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
-//        // 获取该集群中正在运行的项目,如果没有则立即执行
-//        PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
-//        // 获取正在运行的项目的并行度总和
-//        int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
-//        // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
-//        if (currentParallelismSum + parallelism <= simulationLicenseNumber) {
-//            run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
-//        } else {
-//            wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
-//        }
-//    }
-//
-//    /**
-//     * @param projectMessageDTO 初始接收到的项目启动信息
-//     * @param clusterId         集群 id
-//     * @param projectRunningKey projectRunningKey
-//     * @param projectWaitingKey projectWaitingKey
-//     */
-//    public void run(ProjectMessageDTO projectMessageDTO, String clusterId, String projectRunningKey, String projectWaitingKey) {
-//
-//        String projectId = projectMessageDTO.getProjectId();
-//        int parallelism = projectMessageDTO.getParallelism();  // 期望并行度
-//        //1 获取集群剩余可用并行度
-//        int restParallelism = projectUtil.getRestParallelism();
-//        //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
-//        if (restParallelism > 0L) {
-//            log.info("run() 集群 " + clusterId + " 执行项目 " + projectId);
-//            // 设置实际的并行度
-//            projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism));   // 设置实际的并行度
-//            parseProject(projectMessageDTO, projectRunningKey);
-//        } else {
-//            log.info("run() 服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
-//            wait(projectWaitingKey, projectMessageDTO);
-//        }
-//    }
-//
-//    /**
-//     * @param projectWaitingKey 项目等待 key
-//     * @param projectMessageDTO 项目信息
-//     */
-//    @SneakyThrows
-//    public void wait(String projectWaitingKey, ProjectMessageDTO projectMessageDTO) {
-//        stringRedisTemplate.opsForValue().set(projectWaitingKey, JsonUtil.beanToJson(projectMessageDTO));
-//    }
-//
-//
-//    /**
-//     * @param projectMessageDTO 初始接收到的项目启动信息
-//     * @param projectRunningKey projectRunningKey
-//     */
-//    @SneakyThrows
-//    public void parseProject(ProjectMessageDTO projectMessageDTO, String projectRunningKey) {
-//        String projectId = projectMessageDTO.getProjectId();    // 项目 id
-//        String modelType = projectMessageDTO.getModelType();
-//        String vehicleConfigId = projectMessageDTO.getVehicleConfigId();
-//        ProjectPO projectPO = projectUtil.getProjectByProjectId(projectId);
-//        log.info("parseProject() 项目 " + projectId + " 信息为:" + projectPO);
-//        String isChoiceGpu = projectPO.getIsChoiceGpu();
-//        // 项目类型
-//        int currentParallelism = projectMessageDTO.getCurrentParallelism();   // 当前并行度
-//
-//        // 场景测试包 id
-//        // 结果视频的时长
-//        // 模型配置 id
-//        String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
-//        String projectPath = linuxTempPath + "project/" + projectId + "/";
-//        // -------------------------------- 0 准备 --------------------------------
-////        projectService.prepare(projectMessageDTO, projectWaitingKey, projectRunningKey);
-//        // -------------------------------- 1 获取任务 json 列表 --------------------------------
-//        List<String> taskJsonList = FileUtil.listAbsolutePathByTypeAndLength(projectPath, "json", 37);
-//        int taskTotal = taskJsonList.size();
-//        projectMessageDTO.setTaskTotal(taskTotal);
-//        projectMessageDTO.setTaskCompleted(0);
-//
-//        // 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
-//        Map<String, Integer> nodeMap;
-//        if (currentParallelism < taskTotal) {
-//            nodeMap = projectUtil.getNodeMapToUse(currentParallelism);
-//        } else {
-//            nodeMap = projectUtil.getNodeMapToUse(taskTotal);
-//        }
-//        // 将指定 node 的并行度减少
-//        nodeMap.keySet().forEach(nodeName -> {
-//            int parallelismToUse = nodeMap.get(nodeName);
-//            String restParallelismKey = "node:" + nodeName + ":parallelism";
-//            int restParallelism = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue()
-//                    .get(restParallelismKey))); // 剩余可用并行度
-//            stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism - parallelismToUse) + "");
-//        });
-//        // 重新设置实际使用的并行度并保存到 redis
-//        int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
-//        projectMessageDTO.setCurrentParallelism(realCurrentParallelism);
-//        log.info("parseProject() 项目 " + projectId + " 运行在:" + nodeMap);
-//        stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
-//        //* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
-//        String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
-//        // -------------------------------- 4 发送任务消息 --------------------------------
-//        List<NodeTO> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
-//        log.info("parseProject() 项目 " + projectId + " 运行在:" + nodeListToCount);
-//        int messageNumber = 0;
-//        ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1);   // 创建主题
-//        TimeUnit.SECONDS.sleep(6);
-//        // 需要即时启动的任务(并行度的大小)
-//        CopyOnWriteArrayList<String> yamlListToRun = new CopyOnWriteArrayList<>();
-//        for (String taskJsonPath : taskJsonList) {
-//            String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
-//            //TODO 设置任务重试次数为 0,方便任务进行最大3次的重试。
-//            String taskRetryKey = projectRunningKey + ":task:" + taskId + ":retry";
-//            stringRedisTemplate.opsForValue().set(taskRetryKey, "0");
-//            // 保存运行中的任务信息
-//            String taskMessageKey = projectRunningKey + ":task:" + taskId + ":message";
-//            String taskJson = FileUtil.read(taskJsonPath);
-//            stringRedisTemplate.opsForValue().set(taskMessageKey, taskJson);
-//
-//            //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
-//            SendResult<String, String> stringStringSendResult = kafkaTemplate.send(projectId, messageNumber % currentParallelism, taskId, taskJson).get();
-//            RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
-//            String topic = recordMetadata.topic();  // 消息发送到的topic
-//            int partition = recordMetadata.partition(); // 消息发送到的分区
-//            long offset = recordMetadata.offset();  // 消息在分区内的offset
-//            log.info("parseProject() 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:"
-//                    + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
-//            //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
-//            // 选一个 count 最少的 node
-//            String currentNodeName = "";
-//            NodeTO currentNodeTO = null;
-//            int currentCount = Integer.MAX_VALUE;
-//            log.info("parseProject() 各节点已经预定的任务个数为:" + nodeListToCount);
-//            for (NodeTO nodeTO : nodeListToCount) {
-//                int tempCount = nodeTO.getCount();
-//                String tempNodeName = nodeTO.getNodeName();
-//                if (tempCount < currentCount) {
-//                    currentCount = tempCount;
-//                    currentNodeName = tempNodeName;
-//                    currentNodeTO = nodeTO;
-//                }
-//            }
-//            currentNodeTO.setCount(currentNodeTO.getCount() + 1);
-//
-//            log.info("parseProject() 项目 " + projectId + " 准备创建 yaml:是否使用 gpu " + isChoiceGpu + ",当前节点名称为:" + currentNodeName + ",当前节点已创建 yaml 个数为:" + currentCount);
-//            String tempYaml = projectManager.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu);
-//            if (currentCount == 0) {
-//                log.info("parseProject() 加入到启动列表 " + tempYaml);
-//                yamlListToRun.add(tempYaml);
-//            }
-//            messageNumber++;
-//        }
-//        TimeUnit.SECONDS.sleep(6);
-//        log.info("parseProject() 项目 " + projectId + " 共发送了 " + messageNumber + " 条消息。");
-//        log.info("parseProject() 项目 " + projectId + " 准备首先启动 " + yamlListToRun);
-//        for (String yaml : yamlListToRun) {
-//            projectUtil.createPod2(yaml);
-//        }
-//        log.info("parseProject() 项目 " + projectId + " 已经启动 " + yamlListToRun);
-//    }
-//
-//    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")
-//    @SneakyThrows
-//    public void stopProject(ConsumerRecord<String, String> stopRecord) {
-//        log.info("stopProject() 接收到的项目终止消息为:" + stopRecord);
-//        //1 读取 kafka 的项目停止信息
-//        /*
-//            {
-//                "projectId": "sadfasdfs",	// 项目 id
-//                "type": "1",	// 项目类型
-//            }
-//         */
-//
-//        String json = stopRecord.value();
-//        ObjectMapper objectMapper = new ObjectMapper();
-//        JsonNode jsonNode = objectMapper.readTree(json);
-//        String projectId = jsonNode.path("projectId").asText();
-//        String type = jsonNode.path("type").asText();
-//        projectService.stopProject(projectId, type);
-//    }
-//
-//
-//}