夜得朦胧 1 year ago
parent
commit
9a7988c215
18 changed files with 1397 additions and 3 deletions
  1. 22 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/adapter/consumer/ProjectConsumer.java
  2. 5 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/adapter/controller/TaskController.java
  3. 29 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/app/entity/MultiProjectWaitQueueEntity.java
  4. 443 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/app/service/ProjectApplicationService.java
  5. 173 2
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/app/service/TaskApplicationService.java
  6. 342 1
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/domain/service/ProjectDomainService.java
  7. 13 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/configuration/kubernetes/KubernetesConfiguration.java
  8. 53 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/MultiSimulationProjectMapper.java
  9. 27 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/MultiSimulationProjectResultMapper.java
  10. 36 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/MultiSimulationProjectTaskRecordMapper.java
  11. 48 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/MultiSimulationSceneCarMapper.java
  12. 43 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/MultiSimulationSceneMapper.java
  13. 46 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/SimulationMapMapper.java
  14. 42 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/SimulationMapPathMapper.java
  15. 9 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/UserMapper.java
  16. 20 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/entity/MultiInfoEntity.java
  17. 24 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/entity/MultiTaskMessageEntity.java
  18. 22 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/fs/minio/MinioUtil.java

+ 22 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/adapter/consumer/ProjectConsumer.java

@@ -1,7 +1,9 @@
 package com.css.simulation.resource.scheduler.adapter.consumer;
 
 
+import api.common.pojo.param.project.MultiSimulationProjectKafkaParam;
 import api.common.util.JsonUtil;
+import com.alibaba.fastjson.JSONObject;
 import com.css.simulation.resource.scheduler.adapter.entity.ProjectStartMessageEntity;
 import com.css.simulation.resource.scheduler.adapter.entity.ProjectStopMessageEntity;
 import com.css.simulation.resource.scheduler.app.service.ProjectApplicationService;
@@ -44,5 +46,25 @@ public class ProjectConsumer {
         }
     }
 
+    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${custom.mq-start-multi-project-topic}")
+    public void acceptMultiMessage(ConsumerRecord<String, String> projectStartMessageRecord) {
+        log.info("消费者组 simulation-resource-scheduler 接收到项目开始消息:" + projectStartMessageRecord);
+        projectApplicationService.runMultiProject(JsonUtil.jsonToBean(projectStartMessageRecord.value(), MultiSimulationProjectKafkaParam.class));
+    }
+
+
+    /**
+     */
+    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${custom.mq-stop-multi-project-topic}")
+    public void stopMultiProject(ConsumerRecord<String, String> projectStopMessageRecord) {
+        log.info("消费者组 simulation-resource-scheduler 接收到的项目终止消息:" + projectStopMessageRecord);
+        try {
+            TimeUnit.SECONDS.sleep(10);
+            projectApplicationService.stopMultiProject(JsonUtil.jsonToBean(projectStopMessageRecord.value(), MultiSimulationProjectKafkaParam.class));
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
 
 }

+ 5 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/adapter/controller/TaskController.java

@@ -46,5 +46,10 @@ public class TaskController {
         taskApplicationService.state(taskId, state, podName);
     }
 
+    @GetMapping("/multiState")
+    public void multiState(@RequestParam("taskId") String taskId, @RequestParam("state") String state, @RequestParam("podName") String podName) {
+        taskApplicationService.multiState(taskId, state, podName);
+    }
+
 
 }

+ 29 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/app/entity/MultiProjectWaitQueueEntity.java

@@ -0,0 +1,29 @@
+package com.css.simulation.resource.scheduler.app.entity;
+
+import api.common.pojo.param.project.MultiSimulationSceneKafkaParam;
+import com.css.simulation.resource.scheduler.adapter.entity.ProjectStartMessageEntity;
+import com.css.simulation.resource.scheduler.infra.entity.MultiTaskMessageEntity;
+import lombok.*;
+
+import java.util.List;
+
+@EqualsAndHashCode
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MultiProjectWaitQueueEntity {
+//    private String waitingType; //1等待执行 2等待扩充
+    private Integer waitingParallelism; // 等待扩充或执行的并行度
+
+    /**
+     * 执行到了哪一个
+     */
+    private Integer runState = -1;
+
+    private String projectId;
+
+    private List<MultiTaskMessageEntity> multiTaskMessageEntityList;
+
+    private List<MultiSimulationSceneKafkaParam> kafkaParamList;
+}

+ 443 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/app/service/ProjectApplicationService.java

@@ -1,9 +1,21 @@
 package com.css.simulation.resource.scheduler.app.service;
 
 import api.common.pojo.constants.DictConstants;
+import api.common.pojo.enums.MultiSimulationStatusEnum;
+import api.common.pojo.param.project.MultiCreateYamlRet;
+import api.common.pojo.param.project.MultiSimulationProjectKafkaParam;
+import api.common.pojo.param.project.MultiSimulationProjectParam;
+import api.common.pojo.param.project.MultiSimulationSceneKafkaParam;
+import api.common.pojo.po.project.MultiSimulationProjectTaskRecordPO;
+import api.common.pojo.vo.map.SimulationMapVO;
+import api.common.pojo.vo.project.MultiSimulationProjectVO;
+import api.common.pojo.vo.project.MultiSimulationSceneCarVO;
 import api.common.util.*;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
 import com.css.simulation.resource.scheduler.adapter.entity.ProjectStartMessageEntity;
 import com.css.simulation.resource.scheduler.adapter.entity.ProjectStopMessageEntity;
+import com.css.simulation.resource.scheduler.app.entity.MultiProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.app.entity.ProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.app.entity.VehicleEntity;
 import com.css.simulation.resource.scheduler.domain.service.ProjectDomainService;
@@ -34,6 +46,8 @@ import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.CollectionUtils;
 
 import javax.annotation.Resource;
 import java.io.File;
@@ -52,6 +66,11 @@ public class ProjectApplicationService {
     private String linuxTempPath;
     @Value("${scheduler.linux-path.pod-yaml-directory}")
     private String podYamlDirectory;
+
+    @Value("${scheduler.linux-path.multi-pod-yaml-directory}")
+    private String multiPodYamlDirectory;
+    @Value("${scheduler.linux-path.multi-vtd-xml-generator}")
+    private String multiVtdXmlGenerator;
     @Value("${minio.bucket-name}")
     private String bucketName;
 
@@ -100,6 +119,17 @@ public class ProjectApplicationService {
     private CustomRedisClient customRedisClient;
     @Resource
     private AlgorithmExpandMapper algorithmExpandMapper;
+    @Resource
+    private SimulationMapMapper mapMapper;
+    @Resource
+    private MultiSimulationSceneCarMapper sceneCarMapper;
+
+    @Resource
+    private MultiSimulationProjectMapper multiSimulationProjectMapper;
+
+    @Resource
+    private MultiSimulationProjectTaskRecordMapper taskRecordMapper;
+
 
     // -------------------------------- Comment --------------------------------
 
@@ -475,6 +505,123 @@ public class ProjectApplicationService {
         }
     }
 
+    public void waitMulti(MultiProjectWaitQueueEntity multiProjectWaitQueue) {
+        try {
+            //2 创建等待列表对象
+            final String waitingQueueJson = customRedisClient.get(DictConstants.MULTI_PROJECT_WAIT_QUEUE_KEY);
+            List<MultiProjectWaitQueueEntity> waitingQueue;
+            if (StringUtil.isEmpty(waitingQueueJson)) {
+                waitingQueue = new ArrayList<>();
+            } else {
+                waitingQueue = JsonUtil.jsonToList(waitingQueueJson, MultiProjectWaitQueueEntity.class);
+            }
+            boolean contains = false;
+            for (MultiProjectWaitQueueEntity waitQueueEntity : waitingQueue) {
+                if (waitQueueEntity.getProjectId().equals(multiProjectWaitQueue.getProjectId())) {
+                    contains = true;
+                    if (multiProjectWaitQueue.getWaitingParallelism() > 0){
+                        waitQueueEntity.setWaitingParallelism(multiProjectWaitQueue.getWaitingParallelism());
+                        waitQueueEntity.setMultiTaskMessageEntityList(multiProjectWaitQueue.getMultiTaskMessageEntityList());
+                        waitQueueEntity.setRunState(multiProjectWaitQueue.getRunState());
+                    }else {
+                        // 项目等待为0,则删除
+                        waitingQueue.remove(waitQueueEntity);
+                    }
+                }
+            }
+            if (!contains) {
+                waitingQueue.add(multiProjectWaitQueue);
+            }
+            String newWaitingQueueJson = JsonUtil.listToJson(waitingQueue);
+            customRedisClient.set(DictConstants.MULTI_PROJECT_WAIT_QUEUE_KEY, newWaitingQueueJson);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void removeMulti(String projectId) {
+        try {
+            //2 创建等待列表对象
+            final String waitingQueueJson = customRedisClient.get(DictConstants.MULTI_PROJECT_WAIT_QUEUE_KEY);
+            List<MultiProjectWaitQueueEntity> waitingQueue;
+            if (StringUtil.isEmpty(waitingQueueJson)) {
+                waitingQueue = new ArrayList<>();
+            } else {
+                waitingQueue = JsonUtil.jsonToList(waitingQueueJson, MultiProjectWaitQueueEntity.class);
+            }
+            for (MultiProjectWaitQueueEntity waitQueueEntity : waitingQueue) {
+                if (waitQueueEntity.getProjectId().equals(projectId)) {
+                    waitingQueue.remove(waitQueueEntity);
+                }
+            }
+            String newWaitingQueueJson = JsonUtil.listToJson(waitingQueue);
+            customRedisClient.set(DictConstants.MULTI_PROJECT_WAIT_QUEUE_KEY, newWaitingQueueJson);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @SneakyThrows
+    public void runMulti(int parallelism, MultiProjectWaitQueueEntity multiProjectWaitQueue, String isChoiceGpu) {
+        // 然后再判断是否执行完,未执行完的话则塞入redis,执行完则删除redis
+        // 启动成功之后,更新mysql
+        List<MultiTaskMessageEntity> multiTaskMessageEntityList = multiProjectWaitQueue.getMultiTaskMessageEntityList();
+        String projectId = multiProjectWaitQueue.getProjectId();
+
+        int parallel = parallelism;
+        if (multiTaskMessageEntityList.size()- multiProjectWaitQueue.getRunState()-1 < parallelism){
+            log.info("出现奇怪情况需要使用的并行度大于剩余任务数量parallelism:{},projectId:{},剩余:{}", parallelism, projectId, JSONObject.toJSONString(multiProjectWaitQueue));
+            parallel = multiTaskMessageEntityList.size();
+        }
+        Integer runState = multiProjectWaitQueue.getRunState();
+        // 使用完塞入
+        Map<String, Integer> multiNodeMapToUse = projectDomainService.getMultiNodeMapToUse(isChoiceGpu, parallel);
+        List<MultiCreateYamlRet> yamlList = new ArrayList<>();
+        for (int i = runState + 1; i < parallel + runState + 1; i++) {
+            MultiTaskMessageEntity messageEntity = multiTaskMessageEntityList.get(i);
+            String taskId = messageEntity.getInfo().getTask_id();
+            // 发送kafka消息
+            SendResult<String, String> stringStringSendResult = kafkaTemplate.send(projectId, i % multiTaskMessageEntityList.size(),
+                taskId, JSONObject.toJSONString(messageEntity)).get();
+            RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
+            String topic = recordMetadata.topic();  // 消息发送到的topic
+            int partition = recordMetadata.partition(); // 消息发送到的分区
+            long offset = recordMetadata.offset();  // 消息在分区内的offset
+            log.info("多模式仿真任务发送消息成功, 主题 topic 为项目ID:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset);
+//            String sceneId = messageEntity.getInfo().getScene_id();
+            String modelName =null;
+            for (String name: multiNodeMapToUse.keySet()) {
+                Integer integer = multiNodeMapToUse.get(name);
+                if (integer > 0){
+                    modelName = name;
+                    multiNodeMapToUse.put(name, integer -1);
+                }
+            }
+            if (modelName == null){
+                throw new RuntimeException("未选取到可用的节点");
+            }
+            MultiSimulationSceneKafkaParam multiSimulationSceneKafkaParam = multiProjectWaitQueue.getKafkaParamList().get(i);
+            MultiCreateYamlRet multiTempYaml = projectDomainService.createMultiTempYaml(projectId, multiSimulationSceneKafkaParam, messageEntity, modelName, partition, offset, isChoiceGpu);
+            multiTempYaml.setTaskId(messageEntity.getInfo().getTask_id());
+            multiTempYaml.setNodeName(modelName);
+            yamlList.add(multiTempYaml);
+        }
+        TimeUnit.SECONDS.sleep(10);
+        log.info("项目" + projectId + "共发送了" + yamlList.size() + "条消息,准备首先启动" + yamlList);
+        for (MultiCreateYamlRet redisKey : yamlList) {
+            projectDomainService.createMultiPodBegin(redisKey, redisKey.getYamlRedisKey());
+            // 存nodeName
+            stringRedisTemplate.opsForValue().set("multi-taskId:" + redisKey.getTaskId(), JSONObject.toJSONString(redisKey));
+            // 并行度减1
+            projectDomainService.decrementParallelism(isChoiceGpu, redisKey.getNodeName(), 1);
+        }
+        log.info("项目" + projectId + "已经启动" + yamlList.size());
+        for (MultiCreateYamlRet redisKey : yamlList) {
+            taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationStatusEnum.RUN_STATUS.getProjectStatus(), redisKey.getTaskId(), 0);
+        }
+        // 保存每个机器的并行度
+        projectDomainService.setMultiNodeMapUse(isChoiceGpu, multiNodeMapToUse);
+    }
 
     /**
      * 运行项目
@@ -875,5 +1022,301 @@ public class ProjectApplicationService {
         projectDomainService.checkAlgorithmIsExpand(projectType, projectId, DictConstants.ALGORITHM_EXPAND_STATUS_NOT_TESTED);
     }
 
+    @Async("pool1")
+    public void runMultiProject(MultiSimulationProjectKafkaParam projectStartMessageEntity) {
+        MultiProjectWaitQueueEntity multiTaskAndFixData = createMultiTaskAndFixData(projectStartMessageEntity);
+        checkIfCanRunMulti(multiTaskAndFixData);
+    }
+    @SneakyThrows
+//    @Transactional
+    public void stopMultiProject(MultiSimulationProjectKafkaParam projectKafkaParam) {
+        String projectId = projectKafkaParam.getProjectId();
+        // 删除等待队列中的项目,,
+        removeMulti(projectId);
+        String isChoiceGpu = DictConstants.USE_GPU;
+
+        MultiSimulationProjectVO projectVO = multiSimulationProjectMapper.selectMultiSimulationProjectById(projectId);
+        Integer status = projectVO.getProjectStatus();
+        MultiSimulationProjectParam multiSimulationProjectParam = new MultiSimulationProjectParam();
+        multiSimulationProjectParam.setProjectId(projectId);
+        String projectUserId = projectVO.getProjectUserId();
+        String clusterUserId = projectDomainService.getClusterUserIdByProjectUserId(projectUserId);
+
+        // 查看mysql项目是否已经运行
+        // 如果不为空,则代表已经执行,为空则代表kafka还未被消费
+        List<MultiSimulationProjectTaskRecordPO> recordPOList = taskRecordMapper.selectMultiSimulationProjectTaskRecordList(projectId);
+        if (!CollectionUtils.isEmpty(recordPOList)){
+            for (MultiSimulationProjectTaskRecordPO po: recordPOList) {
+                Integer recordStatus = po.getStatus();
+                if (recordStatus == MultiSimulationStatusEnum.RUN_STATUS.getProjectStatus()){
+                    taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationStatusEnum.TERMINATED_STATUS.getProjectStatus(), po.getId(), 1);
+                    // 删除pod
+                    String nodeNameKey = "multi-taskId:" + po.getId();
+                    String value = stringRedisTemplate.opsForValue().get(nodeNameKey);
+                    MultiCreateYamlRet multiCreateYamlRet = JSONObject.parseObject(value, MultiCreateYamlRet.class);
+
+                    String nodeName = multiCreateYamlRet.getNodeName();
+                    String podName = multiCreateYamlRet.getPodName();
+                    // 删除 pod
+                    projectDomainService.deleteMultiProjectPod(podName);
+                    // 节点并行度加一
+                    projectDomainService.incrementOneParallelism(isChoiceGpu, nodeName);
+                    // 释放证书
+                    projectDomainService.releaseLicense(clusterUserId, DictConstants.MODEL_TYPE_VTD, 1);
+                }
+            }
+            // 设置整个仿真项目
+            if (status == MultiSimulationStatusEnum.RUN_STATUS.getProjectStatus()){
+                multiSimulationProjectParam.setProjectStatus(MultiSimulationStatusEnum.TERMINATED_STATUS.getProjectStatus());
+                multiSimulationProjectMapper.updateMultiSimulationProjectStatus(multiSimulationProjectParam);
+            }
+
+            // 删除kafka topic
+            KafkaUtil.deleteTopic(kafkaAdminClient, projectId);
+            //6 删除项目 pod 启动文件
+            FileUtil.deleteFileBySubstring(multiPodYamlDirectory, projectId);
+            //7 删除项目临时文件
+            FileUtil.rm(linuxTempPath + "multiProject/" + projectId + "/");
+            // 删除minio临时文件
+            MinioUtil.rmR(minioClient, bucketName, projectResultPathOfMinio + projectId + "/");
+            // 删除算法key
+            // 删除yaml路径redis
+            // 删除记录podNamekey
+            RedisUtil.deleteByPrefix(stringRedisTemplate, "multi_project:" + projectId);
+        }
+
+    }
+
+    public MultiProjectWaitQueueEntity createMultiTaskAndFixData(MultiSimulationProjectKafkaParam projectStartMessageEntity) {
+        //* -------------------------------- 0 读取消息,创建临时目录 --------------------------------
+        String projectId = projectStartMessageEntity.getProjectId();                // 手动执行项目 id 或 自动执行子项目 id
+        try {
+
+            List<MultiSimulationSceneKafkaParam> kafkaParamList = projectStartMessageEntity.getKafkaParamList();
+            
+            List<MultiTaskMessageEntity> entityList = new ArrayList<>();
+            for (MultiSimulationSceneKafkaParam kafkaParam: kafkaParamList) {
+                String taskId = StringUtil.getRandomUUID();
+                String mapId = kafkaParam.getMapId();
+                String sceneId = kafkaParam.getId();
+                String minioUploadPath = projectId + "/" + taskId + "/";
+                SimulationMapVO simulationMapVO = mapMapper.selectMapByMapId(mapId);
+                if (Objects.isNull(simulationMapVO)){
+                    throw new RuntimeException("地图" + mapId + "不存在");
+                }
+                String mapPath = simulationMapVO.getMapPath();
+                String mapOsgbPath = simulationMapVO.getMapOsgbPath();
+
+                String mapMinioPath = mapPath.substring(mapPath.indexOf("/mapFile"), mapPath.indexOf("?"));
+                String[] mapDriverSp = mapMinioPath.split("/");
+                String mapDriverSpName = mapDriverSp[mapDriverSp.length - 1];
+                String[] mapDriverNameSp = mapDriverSpName.split("\\.");
+                String mapDriverLast = mapDriverNameSp[mapDriverNameSp.length - 1];
+                String mapDriverLinuxPath = linuxTempPath + "multiProject/" + projectId + "/" + taskId + "/" + mapDriverSpName;
+                String mapDriverPathOfMinio = projectResultPathOfMinio + minioUploadPath + taskId + "." + mapDriverLast;
+                MinioUtil.downloadToFile(minioClient, bucketName, mapMinioPath, mapDriverLinuxPath);
+                MinioUtil.uploadFromFile(minioClient, mapDriverLinuxPath, bucketName, mapDriverPathOfMinio);
+                FileUtil.rm(mapDriverLinuxPath);   // 删除临时文件
+
+                String mapOsgMinioPath = mapOsgbPath.substring(mapOsgbPath.indexOf("/mapFile"), mapOsgbPath.indexOf("?"));
+                String[] mapOsgSp = mapOsgMinioPath.split("/");
+                String mapOsgSpName = mapOsgSp[mapOsgSp.length - 1];
+                String[] mapOsgSpNameSp = mapOsgSpName.split("\\.");
+                String mapOsgSpNameSpLast = mapOsgSpNameSp[mapOsgSpNameSp.length - 1];
+                String mapOsgLinuxPath = linuxTempPath + "multiProject/" + projectId + "/" + taskId + "/" + mapOsgSpName;
+                String mapOsgPathOfMinio = projectResultPathOfMinio + minioUploadPath + taskId + "." + mapOsgSpNameSpLast;
+                MinioUtil.downloadToFile(minioClient, bucketName, mapOsgMinioPath, mapOsgLinuxPath);
+                MinioUtil.uploadFromFile(minioClient, mapOsgLinuxPath, bucketName, mapOsgPathOfMinio);
+                FileUtil.rm(mapOsgLinuxPath);   // 删除临时文件
+
+                // 生成并上传xml
+                JSONObject carJson = new JSONObject();
+                carJson.put("numOfVehicle", kafkaParam.getSimulationSceneCarVOList().size());
+                JSONArray carArray = new JSONArray();
+
+                List<MultiSimulationSceneCarVO> simulationSceneCarVOList = kafkaParam.getSimulationSceneCarVOList();
+                if (CollectionUtils.isEmpty(simulationSceneCarVOList)){
+                    throw new RuntimeException("未配置车辆");
+                }
+                VehicleEntity vehicle = null;
+                for (MultiSimulationSceneCarVO sceneCar: simulationSceneCarVOList) {
+                    String sceneCarId = sceneCar.getId();
+                    // 处理算法id
+                    String algorithmId = sceneCar.getAlgorithmId();
+                    log.info("项目:" + projectId + ",场景:" + sceneId + ",车辆id:" + sceneCarId + ",开始算法导入。");
+                    String algorithmDockerImage = handleAlgorithm(sceneId, algorithmId);
+                    log.info("项目:" + projectId + ",场景:" + sceneId + ",车辆id:" + sceneCarId + ",算法已导入" + algorithmDockerImage);
+//                    result.put("sceneCarId-" + sceneCarId + "-docker-image", algorithmDockerImage);
+                    customRedisClient.set(projectDomainService.getMultiAlgorithmIdRedisKey(algorithmId, projectId), algorithmDockerImage);
+                    // 处理车辆,一个场景的车辆是一样的,只需取第一个车辆即可
+                    if (vehicle == null){
+                        String carId = sceneCar.getCarId();
+                        //1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
+                        com.css.simulation.resource.scheduler.infra.entity.VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(carId);   // 车辆
+                        List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(carId);    // 摄像头
+                        List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(carId); // 完美传感器
+                        vehicle = VehicleEntity.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build())
+                            .dynamics(DynamicsEntity.builder().dynamics_maxspeed(vehicleEntity.getMaxSpeed()).dynamics_enginepower(vehicleEntity.getEnginePower()).dynamics_maxdecel(vehicleEntity.getMaxDeceleration()).dynamics_maxsteering(vehicleEntity.getMaxSteeringAngle()).dynamics_mass(vehicleEntity.getMass()).dynamics_frontsurfaceeffective(vehicleEntity.getFrontSurfaceEffective()).dynamics_airdragcoefficient(vehicleEntity.getAirDragCoefficient()).dynamics_rollingresistance(vehicleEntity.getRollingResistanceCoefficient()).dynamics_wheeldiameter(vehicleEntity.getWheelDiameter()).dynamics_wheeldrive(vehicleEntity.getWheelDrive()).dynamics_overallefficiency(vehicleEntity.getOverallEfficiency()).dynamics_distfront(vehicleEntity.getFrontDistance()).dynamics_distrear(vehicleEntity.getRearDistance()).dynamics_distleft(vehicleEntity.getLeftDistance()).dynamics_distright(vehicleEntity.getRightDistance()).dynamics_distheight(vehicleEntity.getHeightDistance()).dynamics_wheelbase(vehicleEntity.getWheelbase()).build())
+                            .sensors(SensorsEntity.builder()   // 根据 vehicleId 查询绑定的传感器列表
+                                .camera(cameraEntityList).OGT(ogtEntityList).build()).build();
+                    }
+                    String pathStart = sceneCar.getPathStart();
+                    JSONObject jsonObject = JSONObject.parseObject(pathStart);
+                    jsonObject.remove("name");
+                    carArray.add(jsonObject);
+                }
+                carJson.put("position", carArray);
+                // 生成xml文件
+                String mapXmlLinuxPath = linuxTempPath + "multiProject/" + projectId + "/" + taskId + "/" + taskId + ".xml";
+                String mapJsonLinuxPath = linuxTempPath + "multiProject/" + projectId + "/" + taskId + "/" + taskId + ".json";
+                FileUtil.writeStringToLocalFile(carJson.toJSONString(), mapJsonLinuxPath);
+                String linuxComm = multiVtdXmlGenerator + " " + mapXmlLinuxPath + " " + mapXmlLinuxPath;
+                LinuxUtil.execute(linuxComm);
+                String mapXmlPathOfMinio = projectResultPathOfMinio + minioUploadPath + taskId + ".xml";
+                MinioUtil.uploadFromFile(minioClient, mapXmlLinuxPath, bucketName, mapXmlPathOfMinio);
+                FileUtil.rm(mapXmlLinuxPath);
+                FileUtil.rm(mapJsonLinuxPath);
+
+                MultiTaskMessageEntity build = MultiTaskMessageEntity.builder().info(MultiInfoEntity.builder().project_id(projectId).task_id(taskId).scene_id(sceneId)
+                        .default_time(Long.valueOf(projectStartMessageEntity.getDefaultTime()))
+                        .task_path(projectResultPathOfMinio + minioUploadPath).build())
+                    .scenario(ScenarioEntity.builder().scenario_osc(mapXmlPathOfMinio).scenario_odr(mapDriverPathOfMinio).scenario_osgb(mapOsgPathOfMinio).build())
+                    .vehicleEntity(vehicle)
+                    .build();
+                entityList.add(build);
+            }
+            int sort = 0;
+            for (MultiTaskMessageEntity entity: entityList) {
+                taskRecordMapper.addMultiSimulationProjectTaskRecord(entity.getInfo().getTask_id(), entity.getInfo().getScene_id(), entity.getInfo().getProject_id(),
+                    JSONObject.toJSONString(entity), sort);
+                sort ++;
+            }
+            log.info("project:{},共插入{}条数据", projectId, entityList.size());
+            MultiProjectWaitQueueEntity build = MultiProjectWaitQueueEntity.builder()
+                .multiTaskMessageEntityList(entityList)
+                .projectId(projectId)
+                .waitingParallelism(entityList.size())
+                .kafkaParamList(kafkaParamList)
+                .build();
+            return build;
+        } catch (Exception e) {
+            log.error("项目报错。", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+
+
+    @SneakyThrows
+    // TODO 此处加锁
+    public void checkIfCanRunMulti(MultiProjectWaitQueueEntity projectWaitQueueEntity) {
+//        List<MultiTaskMessageEntity> multiTaskMessageEntityList = projectWaitQueueEntity.getMultiTaskMessageEntityList();
+        //1 项目信息
+        int parallelism = projectWaitQueueEntity.getWaitingParallelism();
+        if (parallelism <=0){
+            log.info("需要只需的项目并行度为0");
+        }
+        String isChoiceGpu = DictConstants.USE_GPU;
+        final UserEntity userEntity = projectDomainService.getUserEntityByMultiProjectId(projectWaitQueueEntity.getProjectId());
+        String projectUserId = userEntity.getId();
+        String roleCode = userEntity.getRoleCode();
+        String useType = userEntity.getUseType();
+
+        ClusterEntity clusterEntity = null;
+        String clusterUserId;  // 项目实际运行使用的用户集群
+
+        if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
+            clusterUserId = DictConstants.SYSTEM_USER_ID;
+        } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
+            clusterUserId = projectUserId;
+            clusterEntity = clusterMapper.selectByUserId(clusterUserId);
+
+        } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
+            if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) {   //3-3 普通子账户,根据自己的独占节点排队
+                clusterUserId = projectUserId;
+                clusterEntity = clusterMapper.selectByUserId(clusterUserId);
+            } else if (DictConstants.USER_TYPE_PUBLIC.equals(useType)) {    //3-4 共享子账户,根据父账户的共享节点排队
+                clusterUserId = userEntity.getCreateUserId();
+                clusterEntity = clusterMapper.selectByUserId(clusterUserId);
+            } else {
+                throw new RuntimeException("用户" + projectUserId + "未知占用类型:" + useType);
+            }
+        } else {
+            throw new RuntimeException("未知角色类型:" + roleCode);
+        }
+        int remainderSimulationLicense = Integer.MAX_VALUE;
+        if (!clusterUserId.equals(DictConstants.SYSTEM_USER_ID)){
+            // 获取仿真软件证书数量和动力学软件证书数量(vtd占一个仿真证书,carsim各占一个)
+            Integer usingSimulationLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_SIMULATION);
+            Integer numSimulationLicense = clusterEntity.getNumSimulationLicense();
+            // 判断仿真证书是否够用,如果证书为0则将项目加入等待队列;如果证书小于并行度则加入扩充队列,并用现有证书执行;如果证书够用,直接执行。
+            remainderSimulationLicense = numSimulationLicense - usingSimulationLicenseNumber;
+        }
+
+        //2 剩余并行度
+        // 考虑仿真证书数量
+        // 剩余并行度
+        int remainderParallelism = projectDomainService.getRemainderMultiParallelism(isChoiceGpu);
+        log.info("计算出剩余可执行的并行度:{}", remainderParallelism);
+        boolean needWait = false;
+        if (DictConstants.SYSTEM_USER_ID.equals(clusterUserId)){
+            // 不需要判断证书
+        } else{
+            if (remainderSimulationLicense <= 0){
+                log.info("multiProjectId:{},仿真数量不够用clusterUserId:{}", projectWaitQueueEntity.getProjectId(), clusterUserId);
+                needWait = true;
+            }
+        }
+//        else if (projectWaitQueueEntity.getRunState() <0 && remainderParallelism >0){
+//            // 首次执行,且可以执行,不进入等待队列
+//            if (remainderSimulationLicense <= 0){
+//                needWait = true;
+//            } else {
+//                // 消耗一个证书
+//                if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)){
+//                    projectDomainService.useLicense(clusterUserId, 1);
+//                }
+//            }
+//        }
+        if (remainderParallelism <= 0 || needWait) {
+            waitMulti(projectWaitQueueEntity);
+        } else if (remainderParallelism < parallelism) {
+            // 初始执行
+            if (projectWaitQueueEntity.getRunState() <0){
+                log.info("多模式仿真初始执行,创建kafka topic:{}", projectWaitQueueEntity.getProjectId());
+                KafkaUtil.createTopic(kafkaAdminClient, projectWaitQueueEntity.getProjectId(), projectWaitQueueEntity.getMultiTaskMessageEntityList().size(), (short) 1);   // 创建主题
+                TimeUnit.SECONDS.sleep(5);
+                if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)){
+                    projectDomainService.useLicense(clusterUserId, 1);
+                }
+            }
+            // 执行完之后删除之前的list.
+            // 现有逻辑不需要不需要再删除list了
+            runMulti(remainderParallelism, projectWaitQueueEntity, isChoiceGpu);
+            Integer runState = projectWaitQueueEntity.getRunState();
+            int runSt = remainderParallelism + runState;
+            waitMulti(MultiProjectWaitQueueEntity.builder().waitingParallelism(parallelism - remainderParallelism)
+                .runState(runSt)
+                .multiTaskMessageEntityList(projectWaitQueueEntity.getMultiTaskMessageEntityList())
+                .kafkaParamList(projectWaitQueueEntity.getKafkaParamList())
+                .build());
+        } else {
+            if (projectWaitQueueEntity.getRunState() <0){
+                log.info("多模式仿真初始执行,剩余并行度够用,创建kafka topic:{}", projectWaitQueueEntity.getProjectId());
+                KafkaUtil.createTopic(kafkaAdminClient, projectWaitQueueEntity.getProjectId(), projectWaitQueueEntity.getMultiTaskMessageEntityList().size(), (short) 1);   // 创建主题
+                TimeUnit.SECONDS.sleep(5);
+                if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)){
+                    projectDomainService.useLicense(clusterUserId, 1);
+                }
+            }
+            runMulti(parallelism, projectWaitQueueEntity, isChoiceGpu);
+            // 能执行完也需要删除之前redis key
+            waitMulti(MultiProjectWaitQueueEntity.builder().waitingParallelism(0)
+                    .runState(projectWaitQueueEntity.getMultiTaskMessageEntityList().size() -1)
+                    .kafkaParamList(projectWaitQueueEntity.getKafkaParamList())
+                    .multiTaskMessageEntityList(projectWaitQueueEntity.getMultiTaskMessageEntityList()).build()
+            );
+        }
+    }
 
 }

+ 173 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/app/service/TaskApplicationService.java

@@ -1,13 +1,24 @@
 package com.css.simulation.resource.scheduler.app.service;
 
 import api.common.pojo.constants.DictConstants;
-import api.common.util.HttpUtil;
-import api.common.util.TimeUtil;
+import api.common.pojo.enums.MultiSimulationResultTypeEnum;
+import api.common.pojo.enums.MultiSimulationStatusEnum;
+import api.common.pojo.param.project.MultiCreateYamlRet;
+import api.common.pojo.param.project.MultiSimulationProjectParam;
+import api.common.pojo.po.project.MultiSimulationProjectResultPO;
+import api.common.pojo.po.project.MultiSimulationProjectTaskRecordPO;
+import api.common.pojo.vo.project.MultiSimulationProjectVO;
+import api.common.util.*;
+import com.alibaba.fastjson.JSONObject;
 import com.css.simulation.resource.scheduler.domain.service.ProjectDomainService;
 import com.css.simulation.resource.scheduler.domain.service.TaskDomainService;
 import com.css.simulation.resource.scheduler.infra.configuration.custom.CustomConfiguration;
 import com.css.simulation.resource.scheduler.infra.configuration.kubernetes.KubernetesConfiguration;
 import com.css.simulation.resource.scheduler.infra.configuration.kubernetes.KubernetesUtil;
+import com.css.simulation.resource.scheduler.infra.db.mysql.mapper.MultiSimulationProjectMapper;
+import com.css.simulation.resource.scheduler.infra.db.mysql.mapper.MultiSimulationProjectResultMapper;
+import com.css.simulation.resource.scheduler.infra.db.mysql.mapper.MultiSimulationProjectTaskRecordMapper;
+import com.css.simulation.resource.scheduler.infra.entity.MultiTaskMessageEntity;
 import com.css.simulation.resource.scheduler.infra.fs.minio.MinioUtil;
 import com.css.simulation.resource.scheduler.infra.db.redis.CustomRedisClient;
 import com.css.simulation.resource.scheduler.infra.entity.PrefixEntity;
@@ -15,15 +26,21 @@ import com.css.simulation.resource.scheduler.infra.entity.ProjectEntity;
 import com.css.simulation.resource.scheduler.infra.entity.TaskEntity;
 import com.css.simulation.resource.scheduler.infra.db.mysql.mapper.SimulationManualProjectTaskMapper;
 import com.css.simulation.resource.scheduler.infra.db.redis.RedisUtil;
+import com.css.simulation.resource.scheduler.infra.mq.kafka.KafkaUtil;
 import io.kubernetes.client.openapi.ApiClient;
 import io.minio.MinioClient;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.admin.Admin;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
 
 import javax.annotation.Resource;
+import java.io.File;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 @Service
@@ -37,6 +54,9 @@ public class TaskApplicationService {
     private StringRedisTemplate stringRedisTemplate;
     @Resource
     private SimulationManualProjectTaskMapper simulationManualProjectTaskMapper;
+    @Resource
+    private MultiSimulationProjectTaskRecordMapper multiTaskRecordMapper;
+
     @Resource
     private MinioClient minioClient;
     @Resource
@@ -52,6 +72,25 @@ public class TaskApplicationService {
     @Resource
     private TaskDomainService taskDomainService;
 
+    @Resource
+    private MultiSimulationProjectMapper multiSimulationProjectMapper;
+    @Resource
+    private MultiSimulationProjectTaskRecordMapper taskRecordMapper;
+    @Resource
+    private MultiSimulationProjectResultMapper multiSimulationResultMapper;
+
+    @Resource(name = "myKafkaAdmin")
+    private Admin kafkaAdminClient;
+    @Value("${scheduler.linux-path.multi-pod-yaml-directory}")
+    private String multiPodYamlDirectory;
+    @Value("${scheduler.linux-path.temp}")
+    private String linuxTempPath;
+    @Value("${scheduler.minio-path.project-result}")
+    private String projectResultPathOfMinio;
+
+    //
+    @Value("${scheduler.linux-path.multi-vtd-pod-ana-py-file}")
+    private String multiVtdPodTemplateAnaPy;
 
     // -------------------------------- Comment --------------------------------
 
@@ -180,6 +219,138 @@ public class TaskApplicationService {
 
     }
 
+    @Async
+    public void multiState(String taskId, String state, String podName) {
+        MultiSimulationProjectTaskRecordPO taskEntity = multiTaskRecordMapper.selectMultiSimulationProjectTaskRecordById(taskId);
+        if (taskEntity == null) {
+            log.info("多模式仿真收到不存在的任务的状态消息:" + taskId);
+            return;
+        }
+
+        String projectId = taskEntity.getProjectId(); // 项目 id
+        String minioUploadPath = projectId + "/" + taskId + "/";
+
+        String lock1 = "taskId:" + taskId + ":state:" + state + ":pod-name:" + podName;
+        customRedisClient.lock(lock1, 1L, 30 * 60L);
+        String isChoiceGpu = DictConstants.USE_GPU;
+        try {
+
+            MultiSimulationProjectVO projectVO = multiSimulationProjectMapper.selectMultiSimulationProjectById(projectId);
+//            Integer status = projectVO.getProjectStatus();
+            MultiSimulationProjectParam multiSimulationProjectParam = new MultiSimulationProjectParam();
+            multiSimulationProjectParam.setProjectId(projectId);
+            String projectUserId = projectVO.getProjectUserId();
+            String clusterUserId = projectDomainService.getClusterUserIdByProjectUserId(projectUserId);
+            // 查询相关信息
+            String nodeNameKey = "multi-taskId:" + taskEntity.getId();
+            String value = stringRedisTemplate.opsForValue().get(nodeNameKey);
+            MultiCreateYamlRet multiCreateYamlRet = JSONObject.parseObject(value, MultiCreateYamlRet.class);
+            String nodeName = multiCreateYamlRet.getNodeName();
+            if (DictConstants.TASK_RUNNING.equals(state)) {  // 运行中的 pod 无需删除
+                // 将运行中的任务的 pod 名称放入 redis
+                log.info("修改任务 " + taskId + " 的状态为 " + state + ",pod 名称为:" + podName);
+                multiTaskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationStatusEnum.RUN_STATUS.getProjectStatus(),
+                    taskEntity.getId(), 0);
+                return;
+            }else {
+                // 结束的 pod 都直接删除,并判断项目是否完成
+                log.info("修改任务 {} 的状态为 {} ,pod 名称为 {} ,并删除 pod。", taskId, state, podName);
+                // 已完成
+                // 删除pod
+                KubernetesUtil.deletePod2(apiClient, kubernetesConfiguration.getMultiNamespace(), podName);
+                // 并行度加一
+                projectDomainService.incrementOneParallelism(isChoiceGpu, nodeName);
+                // 释放证书
+                projectDomainService.releaseLicense(projectDomainService.getClusterUserIdByProjectUserId(projectUserId), DictConstants.MODEL_TYPE_VTD, 1);
+                if (DictConstants.TASK_COMPLETED.equals(state)) {
+                    // 更新状态
+                    taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationStatusEnum.COMPLETED_STATUS.getProjectStatus(), taskEntity.getId(), 0);
+                    // 进行仿真评价
+                    String taskBody = taskEntity.getTaskBody();
+                    MultiTaskMessageEntity messageEntity = JSONObject.parseObject(taskBody, MultiTaskMessageEntity.class);
+                    String taskPath = messageEntity.getInfo().getTask_path();
+                    List<String> list = MinioUtil.listAllFileName(minioClient, bucketName, taskPath);
+                    String csvName = null;
+                    for (String str : list) {
+                        if (StringUtils.contains(str, "csv")) {
+                            csvName = str;
+                        }
+                    }
+                    // 文件存在
+                    if (StringUtils.isNotBlank(csvName)) {
+                        String linuxPath = linuxTempPath + "multiProject/" + projectId + "/" + taskId + "/";
+                        String linuxFile = linuxPath + csvName;
+                        MinioUtil.downloadToFile(minioClient, bucketName, taskPath + csvName, linuxFile);
+                        String pythonCom = "python3 " + multiVtdPodTemplateAnaPy + " --csvFile=\"" + linuxPath + "\"" + " --outputResultFile=\"" + linuxPath + "\"";
+                        LinuxUtil.execute(pythonCom);
+                        Thread.sleep(10000);
+                        // 获取到json结果,并插入数据库
+                        String podString = FileUtil.read(new File(linuxPath + "result.json"));
+                        JSONObject jsonObject = JSONObject.parseObject(podString).getJSONObject("scenario");
+                        MultiSimulationProjectResultPO multiSimulationProjectResultPO = new MultiSimulationProjectResultPO();
+                        multiSimulationProjectResultPO.setId(StringUtil.getRandomUUID()).setSceneId(taskEntity.getSceneId())
+                            .setAbnormalTimeDescription(jsonObject.getString("collision_time")).setAbnormalType(MultiSimulationResultTypeEnum.COLLISION.getResultType());
+                        multiSimulationResultMapper.insertProjectResult(multiSimulationProjectResultPO);
+                        multiSimulationProjectResultPO = new MultiSimulationProjectResultPO();
+                        multiSimulationProjectResultPO.setId(StringUtil.getRandomUUID()).setSceneId(taskEntity.getSceneId())
+                            .setAbnormalTimeDescription(jsonObject.getString("line_crossing_time")).setAbnormalType(MultiSimulationResultTypeEnum.ABNORMAL_PARKING.getResultType());
+                        multiSimulationResultMapper.insertProjectResult(multiSimulationProjectResultPO);
+                        multiSimulationProjectResultPO = new MultiSimulationProjectResultPO();
+                        multiSimulationProjectResultPO.setId(StringUtil.getRandomUUID()).setSceneId(taskEntity.getSceneId())
+                            .setAbnormalTimeDescription(jsonObject.getString("lane_departure")).setAbnormalType(MultiSimulationResultTypeEnum.OUT_OF_PAVEMENT.getResultType());
+                        multiSimulationResultMapper.insertProjectResult(multiSimulationProjectResultPO);
+
+                        multiSimulationProjectResultPO = new MultiSimulationProjectResultPO();
+                        multiSimulationProjectResultPO.setId(StringUtil.getRandomUUID()).setSceneId(taskEntity.getSceneId())
+                            .setAbnormalTimeDescription(jsonObject.getString("phrases")).setAbnormalType(MultiSimulationResultTypeEnum.LAST_DESCRIPTION.getResultType());
+                        multiSimulationResultMapper.insertProjectResult(multiSimulationProjectResultPO);
+                        // TODO 生成仿真视频url
+
+                    } else {
+                        log.info("taskId:{}未找到csv文件", taskId);
+                    }
+                } else {
+                    // 终止
+                    taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationStatusEnum.TERMINATED_STATUS.getProjectStatus(), taskEntity.getId(), 0);
+                    log.info("taskId:{},项目已经停止", taskId);
+                }
+            }
+            List<MultiSimulationProjectTaskRecordPO> recordPOList = multiTaskRecordMapper.selectMultiSimulationProjectTaskRecordList(projectId);
+            // 如果所有的都已经完成,则进行后续处理
+            boolean allCom = true;
+            if (!CollectionUtils.isEmpty(recordPOList)){
+                for (MultiSimulationProjectTaskRecordPO record: recordPOList) {
+                    Integer status = record.getStatus();
+                    if (status == MultiSimulationStatusEnum.INIT_STATUS.getProjectStatus() || status == MultiSimulationStatusEnum.RUN_STATUS.getProjectStatus()){
+                        allCom = false;
+                    }
+                }
+            }
+            if (allCom){
+                multiSimulationProjectParam.setProjectStatus(MultiSimulationStatusEnum.COMPLETED_STATUS.getProjectStatus());
+                multiSimulationProjectMapper.updateMultiSimulationProjectStatus(multiSimulationProjectParam);
+                // 删除kafka topic
+                KafkaUtil.deleteTopic(kafkaAdminClient, projectId);
+                //6 删除项目 pod 启动文件
+                FileUtil.deleteFileBySubstring(multiPodYamlDirectory, projectId);
+                //7 删除项目临时文件
+                FileUtil.rm(linuxTempPath + "multiProject/" + projectId + "/");
+                // 删除minio临时文件
+                MinioUtil.rmR(minioClient, bucketName, projectResultPathOfMinio + projectId + "/");
+                // 删除算法key
+                // 删除yaml路径redis
+                // 删除记录podNamekey
+                RedisUtil.deleteByPrefix(stringRedisTemplate, "multi_project:" + projectId);
+            }
+        } catch (io.kubernetes.client.openapi.ApiException apiException) {
+            log.info("POD:" + podName + "已删除。");
+        } catch (Exception e) {
+            log.error("项目 {} 已结束。", projectId, e);
+        } finally {
+            customRedisClient.unlock(lock1);
+        }
+    }
+
 
     public Boolean confirm(String taskId) {
         return taskDomainService.taskConfirm(taskId);

+ 342 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/domain/service/ProjectDomainService.java

@@ -2,7 +2,11 @@ package com.css.simulation.resource.scheduler.domain.service;
 
 import api.common.pojo.common.ResponseBodyVO;
 import api.common.pojo.constants.DictConstants;
+import api.common.pojo.param.project.MultiCreateYamlRet;
+import api.common.pojo.param.project.MultiSimulationSceneKafkaParam;
+import api.common.pojo.vo.project.MultiSimulationSceneCarVO;
 import api.common.util.*;
+import com.alibaba.fastjson.JSONObject;
 import com.css.simulation.resource.scheduler.adapter.entity.ProjectStartMessageEntity;
 import com.css.simulation.resource.scheduler.app.entity.ProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.infra.configuration.custom.CustomConfiguration;
@@ -14,6 +18,7 @@ import com.css.simulation.resource.scheduler.infra.db.entity.SimulationAutomatic
 import com.css.simulation.resource.scheduler.infra.db.entity.SimulationAutomaticSubprojectEntity;
 import com.css.simulation.resource.scheduler.infra.db.entity.SimulationManualProjectEntity;
 import com.css.simulation.resource.scheduler.infra.db.mysql.mapper.*;
+import com.css.simulation.resource.scheduler.infra.entity.MultiTaskMessageEntity;
 import com.css.simulation.resource.scheduler.infra.entity.PrefixEntity;
 import com.css.simulation.resource.scheduler.infra.entity.ProjectEntity;
 import com.css.simulation.resource.scheduler.infra.entity.UserEntity;
@@ -56,6 +61,23 @@ public class ProjectDomainService {
     @Value("${spring.kafka.bootstrap-servers}")
     private String kafkaIp;
 
+
+    /**
+     * 以下是多模式仿真任务
+     */
+    @Value("${scheduler.linux-path.multi-pod-yaml-directory}")
+    private String multiPodYamlDirectory;
+    @Value("${scheduler.linux-path.multi-vtd-pod-template-first-yaml}")
+    private String multiVtdPodTemplateFirstYaml;
+    @Value("${scheduler.linux-path.multi-vtd-pod-template-last-yaml}")
+    private String multiVtdPodTemplateLastYaml;
+
+    @Value("${scheduler.linux-path.multi-vtd-pod-template-init-yaml}")
+    private String multiVtdPodTemplateInitYaml;
+
+    @Value("${scheduler.linux-path.multi-vtd-xml-generator}")
+    private String multiVtdXmlGenerator;
+
     // -------------------------------- Comment --------------------------------
     @Resource
     private StringRedisTemplate stringRedisTemplate;
@@ -189,6 +211,79 @@ public class ProjectDomainService {
         return yamlRedisKey;
     }
 
+    @SneakyThrows
+    public MultiCreateYamlRet createMultiTempYaml(String projectId, MultiSimulationSceneKafkaParam multiSimulationSceneKafkaParam, MultiTaskMessageEntity messageEntity, String nodeName, int kafkaPartition, long kafkaOffset, String isChoiceGpu) {
+        String podName = getMultiRandomPodName(projectId);   // 生成 podName
+        String podYaml = getMultiPodYamlName(nodeName, podName);     // 模板文件名称
+        String yamlPath = multiPodYamlDirectory + podYaml;
+        String podString = FileUtil.read(new File(multiVtdPodTemplateFirstYaml));
+        String algorithmStr = FileUtil.read(new File(multiVtdPodTemplateLastYaml));
+        String algorithmInitStr = FileUtil.read(new File(multiVtdPodTemplateInitYaml));
+
+        podString = podString.replace("pod-name", podName);
+        podString = podString.replace("namespace-name", kubernetesConfiguration.getMultiNamespace());
+        podString = podString.replace("node-name", nodeName);
+        podString = podString.replace("vtd-container", "vtd-" + projectId + "-" + messageEntity.getInfo().getScene_id());
+        podString = podString.replace("vtd-image", kubernetesConfiguration.getMultiImageVtdGpu());
+        podString = podString.replace("simulation-cloud-ip", simulationCloudIp);
+        podString = podString.replace("kafka-ip", kafkaIp);
+        podString = podString.replace("minio-ip", minioConfiguration.getEndpointWithoutHttp());
+        podString = podString.replace("minio-access-key", minioConfiguration.getAccessKey());
+        podString = podString.replace("minio-secret-key", minioConfiguration.getSecretKey());
+        podString = podString.replace("kafka-partition", String.valueOf(kafkaPartition));
+        podString = podString.replace("kafka-offset", String.valueOf(kafkaOffset));
+        podString = podString.replace("cpu-order", "1");
+        List<MultiSimulationSceneCarVO> simulationSceneCarVOList = multiSimulationSceneKafkaParam.getSimulationSceneCarVOList();
+
+        podString = podString.replace("num-of-vehicle", String.valueOf(simulationSceneCarVOList.size()));
+        int cameraVehicleId = 0;
+        for (int i = 0; i < simulationSceneCarVOList.size(); i++) {
+            Integer isSimulationCar = simulationSceneCarVOList.get(i).getIsSimulationCar();
+            if (isSimulationCar == 1){
+                cameraVehicleId = i + 1;
+            }
+        }
+        podString = podString.replace("camera-vehicle-id", String.valueOf(cameraVehicleId));
+
+        StringBuffer stringBuffer = new StringBuffer();
+        StringBuffer stringBufferInit = new StringBuffer();
+        int basePort = 11310;
+        for (int i = 0; i < simulationSceneCarVOList.size(); i++) {
+            MultiSimulationSceneCarVO car = simulationSceneCarVOList.get(i);
+            String temp = algorithmStr.replace("algorithm-container", "algorithm-" + projectId + "-" + messageEntity.getInfo().getScene_id() + "-" + car.getId());
+            String algorithmId = car.getAlgorithmId();
+            String algorithmImage = customRedisClient.get(getMultiAlgorithmIdRedisKey(algorithmId, projectId));
+            temp = temp.replace("algorithm-image", algorithmImage);
+            temp = temp.replace("ros-master-uri", "http://localhost:" + String.valueOf(basePort + i + 1));
+            temp = temp.replace("vehicle-id", String.valueOf(i+1));
+            String pathEnd = car.getPathEnd();
+            JSONObject object = JSONObject.parseObject(pathEnd);
+            temp = temp.replace("end-coord", String.valueOf(object.get("x")) + "," + String.valueOf(object.get("y")));
+            stringBuffer.append(temp);
+
+            String tempInit = algorithmInitStr.replace("algorithm-image", algorithmImage);
+            stringBufferInit.append(tempInit);
+        }
+        podString.replace("algorithmContainerDemo", stringBuffer.toString());
+        podString.replace("algorithmContainerInit", stringBufferInit.toString());
+
+        log.info("保存项目projectId:" + projectId + "sceneId:" + messageEntity.getInfo().getScene_id() + "的 yaml 文件:" + yamlPath);
+        FileUtil.writeStringToLocalFile(podString, yamlPath);
+        // 保存 yaml 地址
+        String yamlRedisKey = "multi_project:" + projectId+":scene:" + messageEntity.getInfo().getScene_id() + ":node:" + nodeName + ":yaml:" + podName;
+        stringRedisTemplate.opsForValue().set(yamlRedisKey, yamlPath);
+        MultiCreateYamlRet multiCreateYamlRet = new MultiCreateYamlRet();
+        multiCreateYamlRet.setProjectId(projectId);
+        multiCreateYamlRet.setSceneId(messageEntity.getInfo().getScene_id());
+        multiCreateYamlRet.setYamlRedisKey(yamlRedisKey);
+        multiCreateYamlRet.setPodName(podName);
+        return multiCreateYamlRet;
+    }
+
+    public String getMultiAlgorithmIdRedisKey(String algorithmId,String projectId){
+        return "multi_project:" + projectId + ":algorithmId:" + algorithmId + ":docker-image";
+    }
+
 
     public String getIsChoiceGpuByProjectId(String projectId) {
         return getProjectByProjectId(projectId).getIsChoiceGpu();
@@ -236,6 +331,13 @@ public class ProjectDomainService {
         return nodeName + "#" + podName + ".yaml";
     }
 
+    public String getMultiRandomPodName(String projectId) {
+        return "multi-project-" + projectId + "-" + StringUtil.getRandomEightBitUUID();
+    }
+
+    public String getMultiPodYamlName(String nodeName, String podName) {
+        return nodeName + "#" + podName + ".yaml";
+    }
 
     public void deletePod(String podName) {
         try {
@@ -251,6 +353,20 @@ public class ProjectDomainService {
         }
     }
 
+    public void deleteMultiProjectPod(String podName) {
+        try {
+            // 先删除 redis key
+            KubernetesUtil.deletePod(apiClient, kubernetesConfiguration.getMultiNamespace(), podName);
+            log.info("等待 pod " + podName + " 的资源释放完成。");
+            TimeUnit.SECONDS.sleep(3);
+        } catch (ApiException apiException) {
+            log.info("pod " + podName + " 已删除。");
+        } catch (Exception e) {
+            e.printStackTrace();
+            log.error("删除 pod " + podName + " 报错。", e);
+        }
+    }
+
 
     public String getNodeNameOfPod(String projectId, String podName) {
         String key = "project:" + projectId + ":pod:" + podName + ":node";
@@ -261,6 +377,15 @@ public class ProjectDomainService {
         return s;
     }
 
+    public String getNodeNameOfMultiProjectPod(String projectId, String podName) {
+        String key = "project:" + projectId + ":pod:" + podName + ":node";
+        final String s = stringRedisTemplate.opsForValue().get(key);
+        if (StringUtil.isEmpty(s)) {
+            throw new RuntimeException("无法获取 pod 运行所在节点:" + key);
+        }
+        return s;
+    }
+
     public String getNodeNameOfPod2(String projectId, String podName) {
         String key = "project:" + projectId + ":pod:" + podName + ":node";
         final String s = stringRedisTemplate.opsForValue().get(key);
@@ -321,6 +446,18 @@ public class ProjectDomainService {
         new Thread(() -> KubernetesUtil.applyYaml(podYamlPath), "apply-" + podName).start();
     }
 
+    public void createMultiPodBegin(MultiCreateYamlRet multiCreateYamlRet, String redisKey) {
+        final String podYamlPath = stringRedisTemplate.opsForValue().get(redisKey);
+        if (podYamlPath == null) {
+            throw new RuntimeException("根据缓存 key 获取 yaml 地址为 null:" + redisKey);
+        }
+        stringRedisTemplate.delete(redisKey);
+        String nodeName = new File(podYamlPath).getName().split("#")[0];
+        String podName = podYamlPath.split("#")[1].split("\\.")[0];
+        stringRedisTemplate.opsForValue().set("multi_project:" + multiCreateYamlRet.getProjectId() + ":sceneId:" + multiCreateYamlRet.getSceneId() + ":pod:" + podName + ":node", nodeName);    // 将 pod 运行在哪个 node 上记录到 redis
+        new Thread(() -> KubernetesUtil.applyYaml(podYamlPath), "apply-" + podName).start();
+    }
+
 
     /**
      * @param redisKey yaml 地址的缓存 key
@@ -399,6 +536,50 @@ public class ProjectDomainService {
         return resultNodeMap;
     }
 
+    public Map<String, Integer> getMultiRemainderNodeMap(String isChoiceGpu) {
+        Map<String, Integer> resultNodeMap = new HashMap<>();    // 用于执行的节点映射(节点名,并行度)
+        if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
+            List<NodeEntity> initialNodeList = kubernetesConfiguration.getMultiGpuNodeList();
+            log.info("多模式仿真预设并行度的GPU节点列表为:" + initialNodeList);
+            for (NodeEntity kubernetesNodeSource : initialNodeList) {
+                NodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
+                String nodeName = kubernetesNodeCopy.getHostname();
+                int maxParallelism = kubernetesNodeCopy.getParallelism();
+                String restParallelismKey = "gpu-node:" + nodeName + ":parallelism";
+                String restParallelismString = stringRedisTemplate.opsForValue().get(restParallelismKey);
+                int restParallelism;
+                if (restParallelismString == null) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
+                    restParallelism = maxParallelism;
+                    stringRedisTemplate.opsForValue().set(restParallelismKey, String.valueOf(restParallelism));
+                } else {
+                    restParallelism = Integer.parseInt(restParallelismString);
+                }
+                resultNodeMap.put(nodeName, restParallelism);
+            }
+            log.info("剩余并行度的GPU节点列表为:" + resultNodeMap);
+        } else if (DictConstants.USE_CPU.equals(isChoiceGpu)) {
+            List<NodeEntity> initialNodeList = kubernetesConfiguration.getMultiCpuNodeList();
+            log.info("多模式仿真预设并行度的CPU节点列表为:" + initialNodeList);
+            for (NodeEntity kubernetesNodeSource : initialNodeList) {
+                NodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
+                String nodeName = kubernetesNodeCopy.getHostname();
+                int maxParallelism = kubernetesNodeCopy.getParallelism();
+                String restParallelismKey = "cpu-node:" + nodeName + ":parallelism";
+                String restParallelismString = stringRedisTemplate.opsForValue().get(restParallelismKey);
+                int restParallelism;
+                if (restParallelismString == null) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
+                    restParallelism = maxParallelism;
+                    stringRedisTemplate.opsForValue().set(restParallelismKey, String.valueOf(restParallelism));
+                } else {
+                    restParallelism = Integer.parseInt(restParallelismString);
+                }
+                resultNodeMap.put(nodeName, restParallelism);
+            }
+            log.info("剩余并行度的CPU节点列表为:" + resultNodeMap);
+        }
+        return resultNodeMap;
+    }
+
 
     /**
      * 根据并行度获取用于执行的节点列表
@@ -510,6 +691,106 @@ public class ProjectDomainService {
 
     }
 
+    public Map<String, Integer> getMultiNodeMapToUse(String isChoiceGpu, int parallelism) {
+        List<NodeEntity> initialNodeList; // 预设并行度的节点列表
+        if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
+            initialNodeList = kubernetesConfiguration.getMultiGpuNodeList();
+            log.info("多模式仿真任务预设并行度的节点列表为:" + initialNodeList);
+            // 遍历所有节点,获取还有剩余并行度的节点
+            List<NodeEntity> restNodeList = new ArrayList<>();    // 剩余并行度的节点列表
+            for (NodeEntity kubernetesNodeSource : initialNodeList) {
+                NodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
+                String nodeName = kubernetesNodeCopy.getHostname();   // 节点名称
+                int maxParallelism = kubernetesNodeCopy.getParallelism();
+                String restParallelismString = stringRedisTemplate.opsForValue().get("gpu-node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
+                // -------------------------------- Comment --------------------------------
+                int restParallelism;
+                if (restParallelismString == null || Integer.parseInt(restParallelismString) > maxParallelism) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
+                    restParallelism = maxParallelism;
+                    stringRedisTemplate.opsForValue().set("gpu-node:" + nodeName + ":parallelism", String.valueOf(restParallelism));
+                } else {
+                    restParallelism = Integer.parseInt(restParallelismString);
+                    kubernetesNodeCopy.setParallelism(restParallelism);
+                }
+                if (restParallelism > 0) {
+                    restNodeList.add(kubernetesNodeCopy);
+                }
+            }
+            log.info("多模式仿真剩余并行度的节点列表为:" + restNodeList);
+            Map<String, Integer> resultNodeMap = new HashMap<>();    // 用于执行的节点映射(节点名,并行度)
+            for (NodeEntity node: restNodeList) {
+                resultNodeMap.put(node.getHostname(), node.getParallelism());
+            }
+            log.info("即将使用节点的并行度为:" + resultNodeMap);
+            return resultNodeMap;
+        } else if (DictConstants.USE_CPU.equals(isChoiceGpu)) {
+            initialNodeList = kubernetesConfiguration.getMultiCpuNodeList();
+            log.info("多模式仿真预设并行度的节点列表为:" + initialNodeList);
+            // 遍历所有节点,获取还有剩余并行度的节点
+            List<NodeEntity> restNodeList = new ArrayList<>();    // 剩余并行度的节点列表
+            for (NodeEntity kubernetesNodeSource : initialNodeList) {
+                NodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
+                String nodeName = kubernetesNodeCopy.getHostname();   // 节点名称
+                int maxParallelism = kubernetesNodeCopy.getParallelism();
+                String restParallelismString = stringRedisTemplate.opsForValue().get("cpu-node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
+                // -------------------------------- Comment --------------------------------
+                int restParallelism;
+                if (restParallelismString == null || Integer.parseInt(restParallelismString) > maxParallelism) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
+                    restParallelism = maxParallelism;
+                    stringRedisTemplate.opsForValue().set("cpu-node:" + nodeName + ":parallelism", String.valueOf(restParallelism));
+                } else {
+                    restParallelism = Integer.parseInt(restParallelismString);
+                    kubernetesNodeCopy.setParallelism(restParallelism);
+                }
+                if (restParallelism > 0) {
+                    restNodeList.add(kubernetesNodeCopy);
+                }
+            }
+            log.info("多模式仿真剩余并行度的节点列表为:" + restNodeList);
+            Map<String, Integer> resultNodeMap = new HashMap<>();    // 用于执行的节点映射(节点名,并行度)
+            for (NodeEntity node: restNodeList) {
+                resultNodeMap.put(node.getHostname(), node.getParallelism());
+            }
+            log.info("多模式仿真即将使用节点的并行度为:" + resultNodeMap);
+            return resultNodeMap;
+        } else {
+            throw new RuntimeException("未知是否使用 GPU:" + isChoiceGpu);
+        }
+
+    }
+
+    public void setMultiNodeMapUse(String isChoiceGpu, Map<String, Integer> map) {
+        List<NodeEntity> initialNodeList; // 预设并行度的节点列表
+        if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
+            initialNodeList = kubernetesConfiguration.getMultiGpuNodeList();
+            log.info("多模式仿真任务预设并行度的节点列表为:" + initialNodeList);
+            // 遍历所有节点,获取还有剩余并行度的节点
+            for (NodeEntity kubernetesNodeSource : initialNodeList) {
+                NodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
+                String nodeName = kubernetesNodeCopy.getHostname();   // 节点名称
+                if (map.containsKey(nodeName)){
+                    Integer integer = map.get(nodeName);
+                    stringRedisTemplate.opsForValue().set("gpu-node:" + nodeName + ":parallelism", String.valueOf(integer));
+                }
+            }
+        } else if (DictConstants.USE_CPU.equals(isChoiceGpu)) {
+            initialNodeList = kubernetesConfiguration.getMultiCpuNodeList();
+            log.info("多模式仿真预设并行度的节点列表为:" + initialNodeList);
+            // 遍历所有节点,获取还有剩余并行度的节点
+            for (NodeEntity kubernetesNodeSource : initialNodeList) {
+                NodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
+                String nodeName = kubernetesNodeCopy.getHostname();   // 节点名称
+                if (map.containsKey(nodeName)){
+                    Integer integer = map.get(nodeName);
+                    stringRedisTemplate.opsForValue().set("cpu-node:" + nodeName + ":parallelism", String.valueOf(integer));
+                }
+            }
+        }else {
+            throw new RuntimeException("未知是否使用 GPU:" + isChoiceGpu);
+        }
+
+    }
+
     /**
      * 获取集群剩余并行度
      *
@@ -570,6 +851,61 @@ public class ProjectDomainService {
 
     }
 
+    public int getRemainderMultiParallelism(String isChoiceGpu) {
+        List<NodeEntity> initialNodeList; // 预设并行度的节点列表
+        if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
+            initialNodeList = kubernetesConfiguration.getMultiGpuNodeList(); // 预设并行度的节点列表
+            // 遍历所有节点,获取还有剩余并行度的节点
+            List<NodeEntity> restNodeList = new ArrayList<>();    // 剩余并行度的节点列表
+            for (NodeEntity kubernetesNodeSource : initialNodeList) {
+                NodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
+                String nodeName = kubernetesNodeCopy.getHostname();   // 节点名称
+                int maxParallelism = kubernetesNodeCopy.getParallelism();
+                String restParallelismString = stringRedisTemplate.opsForValue().get("gpu-node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
+                // -------------------------------- Comment --------------------------------
+                int restParallelism;
+                if (restParallelismString == null || Integer.parseInt(restParallelismString) > maxParallelism) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
+                    restParallelism = maxParallelism;
+                    stringRedisTemplate.opsForValue().set("gpu-node:" + nodeName + ":parallelism", String.valueOf(restParallelism));
+                } else {
+                    restParallelism = Integer.parseInt(restParallelismString);
+                    kubernetesNodeCopy.setParallelism(restParallelism);
+                }
+                if (restParallelism > 0) {
+                    restNodeList.add(kubernetesNodeCopy);
+                }
+            }
+            return restNodeList.size() == 0 ? 0 : restNodeList.stream().mapToInt(NodeEntity::getParallelism).sum();
+        } else if (DictConstants.USE_CPU.equals(isChoiceGpu)) {
+            initialNodeList = kubernetesConfiguration.getMultiCpuNodeList(); // 预设并行度的节点列表
+            // 遍历所有节点,获取还有剩余并行度的节点
+            List<NodeEntity> restNodeList = new ArrayList<>();    // 剩余并行度的节点列表
+            for (NodeEntity kubernetesNodeSource : initialNodeList) {
+                NodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
+                String nodeName = kubernetesNodeCopy.getHostname();   // 节点名称
+                int maxParallelism = kubernetesNodeCopy.getParallelism();
+                String restParallelismString = stringRedisTemplate.opsForValue().get("cpu-node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
+                // -------------------------------- Comment --------------------------------
+                int restParallelism;
+                if (restParallelismString == null || Integer.parseInt(restParallelismString) > maxParallelism) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
+                    restParallelism = maxParallelism;
+                    stringRedisTemplate.opsForValue().set("cpu-node:" + nodeName + ":parallelism", String.valueOf(restParallelism));
+                } else {
+                    restParallelism = Integer.parseInt(restParallelismString);
+                    kubernetesNodeCopy.setParallelism(restParallelism);
+                }
+                if (restParallelism > 0) {
+                    restNodeList.add(kubernetesNodeCopy);
+                }
+            }
+            log.debug("集群剩余并行度为:" + restNodeList);
+            return restNodeList.size() == 0 ? 0 : restNodeList.stream().mapToInt(NodeEntity::getParallelism).sum();
+        } else {
+            throw new RuntimeException("未知是否使用 GPU:" + isChoiceGpu);
+        }
+
+    }
+
 
     public PrefixEntity getRedisPrefixByUserIdAndProjectIdAndTaskId(String userId, String projectId, String taskId) {
         //3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
@@ -768,7 +1104,7 @@ public class ProjectDomainService {
         }
     }
 
-    private void useLicense(String key, int number) {
+    public void useLicense(String key, int number) {
         final String usingLicense = customRedisClient.get(key);
         if (StringUtil.isEmpty(usingLicense)) {
             customRedisClient.set(key, String.valueOf(number));
@@ -853,6 +1189,11 @@ public class ProjectDomainService {
         return userEntity;
     }
 
+    public UserEntity getUserEntityByMultiProjectId(String projectId) {
+        UserEntity userEntity = userMapper.selectByMultiManualProjectId(projectId);
+        return userEntity;
+    }
+
 
     @Async("pool1")
     @SneakyThrows

+ 13 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/configuration/kubernetes/KubernetesConfiguration.java

@@ -33,6 +33,19 @@ public class KubernetesConfiguration {
     private List<NodeEntity> cpuNodeList;
     private List<NodeEntity> gpuNodeList;
 
+
+    /**
+     * **************************以下是多模式仿真**********************
+     */
+
+    private String multiNamespace;
+
+    private String multiImageVtdGpu;
+
+    private List<NodeEntity> multiGpuNodeList;
+
+    private List<NodeEntity> multiCpuNodeList;
+
     @Bean
     @SneakyThrows
     public ApiClient apiClient() {

+ 53 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/MultiSimulationProjectMapper.java

@@ -0,0 +1,53 @@
+package com.css.simulation.resource.scheduler.infra.db.mysql.mapper;
+
+import api.common.pojo.param.project.MultiSimulationProjectParam;
+import api.common.pojo.po.project.MultiSimulationProjectPO;
+import api.common.pojo.vo.project.MultiSimulationProjectVO;
+import org.apache.ibatis.annotations.*;
+
+import java.util.List;
+
+@Mapper
+public interface MultiSimulationProjectMapper {
+    @Select("<script>" +
+        "select id,project_key,project_name,project_status,create_time from multi_simulation_project where deleted = 0 " +
+        "<if test='projectKey != null'> " +
+        " AND project_key = #{projectKey}" +
+        "</if>" +
+        "<if test='projectName != null'> " +
+        " AND project_name = #{projectName}" +
+        "</if>" +
+        "<if test='projectStatus != null'> " +
+        " AND project_status = #{projectStatus}" +
+        "</if>" +
+        "<if test='projectId != null'> " +
+        " AND project_id = #{projectId}" +
+        "</if>" +
+        "order by create_time desc" +
+        "</script>")
+    List<MultiSimulationProjectVO> selectProjectList(MultiSimulationProjectParam param);
+
+
+    @Insert("insert into multi_simulation_project (id,project_key,project_name,project_description,project_max_seconds,project_status) values" +
+        "(#{id},#{projectKey},#{projectName},#{projectDescription},#{projectMaxSeconds},#{projectStatus})")
+    int  addMultiSimulationProject(MultiSimulationProjectPO projectPO);
+
+    @Select("select id,project_key,project_name,project_status,create_time from multi_simulation_project where deleted = 0 " +
+        "and project_name = #{projectName} limit 1")
+    MultiSimulationProjectVO selectMultiSimulationProjectByName(@Param("projectName") String projectName);
+
+    @Delete("delete from multi_simulation_project where id = #{id}")
+    int deleteMultiSimulationProject(@Param("id") String id);
+
+    @Update("update multi_simulation_project set project_name = #{projectName}, project_description = #{projectDescription}," +
+        "project_max_seconds = #{projectMaxSeconds} where id = #{projectId}")
+    int updateMultiSimulationProject(MultiSimulationProjectParam param);
+
+    @Update("update multi_simulation_project set project_status = #{projectStatus} " +
+        "where id = #{projectId}")
+    int updateMultiSimulationProjectStatus(MultiSimulationProjectParam param);
+
+    @Select("select id,project_key,project_name,project_status,create_time,project_description from multi_simulation_project where deleted = 0 " +
+        "and id = #{id} limit 1")
+    MultiSimulationProjectVO selectMultiSimulationProjectById(@Param("id") String projectId);
+}

+ 27 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/MultiSimulationProjectResultMapper.java

@@ -0,0 +1,27 @@
+package com.css.simulation.resource.scheduler.infra.db.mysql.mapper;
+
+import api.common.pojo.param.project.MultiSimulationProjectResultParam;
+import api.common.pojo.po.project.MultiSimulationProjectResultPO;
+import api.common.pojo.vo.project.MultiSimulationProjectResultVO;
+import org.apache.ibatis.annotations.Insert;
+import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Select;
+
+import java.util.List;
+
+@Mapper
+public interface MultiSimulationProjectResultMapper {
+    @Select("<script>" +
+        "select id,scene_id,abnormal_type,abnormal_time from multi_simulation_project_result where deleted = 0 " +
+        "and scene_id = #{sceneId}" +
+        "order by abnormal_time,create_time desc" +
+        "</script>")
+    List<MultiSimulationProjectResultVO> selectProjectResultList(MultiSimulationProjectResultParam param);
+
+    @Insert("<script>" +
+        "insert into multi_simulation_project_result (id,scene_id,abnormal_type,abnormal_time_description) values" +
+        "(#{id},#{sceneId},#{abnormalType},#{abnormalTimeDescription})" +
+        "</script>")
+    int insertProjectResult(MultiSimulationProjectResultPO param);
+
+}

+ 36 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/MultiSimulationProjectTaskRecordMapper.java

@@ -0,0 +1,36 @@
+package com.css.simulation.resource.scheduler.infra.db.mysql.mapper;
+
+import api.common.pojo.po.project.MultiSimulationProjectTaskRecordPO;
+import org.apache.ibatis.annotations.*;
+
+import java.util.List;
+
+@Mapper
+public interface MultiSimulationProjectTaskRecordMapper {
+    @Insert("insert into multi_simulation_project_task_record (id,scene_id,project_id,task_body,status,sort) values" +
+        "(#{id},#{sceneId},#{projectId},#{taskBody},0,#{sort})")
+    int addMultiSimulationProjectTaskRecord(@Param("id") String id, @Param("sceneId") String sceneId, @Param("projectId") String projectId, @Param("taskBody") String taskBody, @Param("sort") Integer sort);
+
+    @Update("update multi_simulation_project_task_record set status = #{status},deleted = #{deleted} " +
+        "where id = #{taskId}")
+    int updateMultiSimulationProjectTaskRecordStatus(@Param("status") Integer status, @Param("taskId") String taskId, @Param("deleted") Integer deleted);
+
+    @Select("select id,project_id,scene_id,task_body,status,sort multi_simulation_project_task_record where project_id = #{projectId} and deleted = 0")
+    List<MultiSimulationProjectTaskRecordPO> selectMultiSimulationProjectTaskRecordList(@Param("projectId") String projectId);
+
+    @Select("select id,project_id,scene_id,task_body,status,sort multi_simulation_project_task_record where id = #{taskId} and deleted = 0")
+    MultiSimulationProjectTaskRecordPO selectMultiSimulationProjectTaskRecordById(@Param("taskId") String taskId);
+
+//    @Select("select * from multi_simulation_project where deleted = 0 " +
+//        "and id = #{id}")
+//    MultiSimulationProjectTaskRecordPO selectMultiSimulationTaskByTaskId(@Param("id") String id);
+//
+//    @Select("select * from multi_simulation_project where deleted = 0 " +
+//        "and scene_id = #{sceneId}")
+//    MultiSimulationProjectTaskRecordPO selectMultiSimulationTaskBySceneId(@Param("sceneId") String sceneId);
+//
+//    @Select("select * from multi_simulation_project where deleted = 0 " +
+//        "and project_id = #{projectId} order by sort,create_time asc")
+//    List<MultiSimulationProjectTaskRecordPO> selectMultiSimulationTaskProjectId(@Param("projectId") String projectId);
+
+}

+ 48 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/MultiSimulationSceneCarMapper.java

@@ -0,0 +1,48 @@
+package com.css.simulation.resource.scheduler.infra.db.mysql.mapper;
+
+import api.common.pojo.param.project.MultiSimulationSceneCarParam;
+import api.common.pojo.vo.project.MultiSimulationSceneCarVO;
+import org.apache.ibatis.annotations.*;
+
+import java.util.List;
+
+@Mapper
+public interface MultiSimulationSceneCarMapper {
+    @Insert("insert into multi_simulation_scene_car (id,scene_id,car_id,algorithm_id,algorithm_type,path_id,path_start,path_end,path_start_point,path_end_point,car_sort,is_simulation_car) values" +
+        "(#{sceneCarId},#{sceneId},#{carId},#{algorithmId},#{algorithmType},#{pathId},#{pathStart},#{pathEnd},#{pathStartPoint},#{pathEndPoint},#{carSort},#{isSimulationCar})")
+    int addMultiSimulationSceneCar(MultiSimulationSceneCarParam param);
+
+    @Select("select count(*) from multi_simulation_scene_car where scene_id = #{sceneId} and deleted = 0")
+    int selectSceneCarNumsBySceneId(@Param("sceneId") String sceneId);
+
+
+    @Update("<script>" +
+        "update multi_simulation_scene_car set car_id = #{carId},algorithm_id = #{algorithmId}, algorithm_type = #{algorithmType}, path_id = #{pathId}," +
+        "path_start = #{pathStart},path_end = #{pathEnd},path_start_point = #{pathStartPoint},path_end_point = #{pathEndPoint},car_sort = #{carSort} " +
+        "where id = #{sceneCarId}" +
+        "</script>")
+    int updateMultiSimulationSceneCar(MultiSimulationSceneCarParam param);
+
+    @Update("<script>" +
+        "update multi_simulation_scene_car set is_simulation_car = #{isSimulationCar} " +
+        "where id = #{id}" +
+        "</script>")
+    int updateMultiSimulationSceneCarView(@Param("id") String id, @Param("isSimulationCar") Integer isSimulationCar);
+
+    @Update("update multi_simulation_scene_car set deleted = 1 where id = #{sceneCarId}")
+    int deleteMultiSimulationSceneCar(MultiSimulationSceneCarParam param);
+//
+    @Select("<script>" +
+        "select id,scene_id,car_id,algorithm_id,algorithm_type,path_id,path_start,path_end,path_start_point,path_end_point,car_sort,create_time from multi_simulation_scene_car where deleted = 0 " +
+        "and scene_id = #{sceneId}" +
+        "order by car_sort, create_time desc" +
+        "</script>")
+    List<MultiSimulationSceneCarVO> selectSceneCarList(MultiSimulationSceneCarParam param);
+
+    @Select("<script>" +
+        "select id,scene_id,car_id,algorithm_id,algorithm_type,path_id,path_start,path_end,path_start_point,path_end_point,car_sort,create_time from multi_simulation_scene_car where deleted = 0 " +
+        "and id = #{sceneCarId}" +
+        "</script>")
+    MultiSimulationSceneCarVO selectSceneCarById(MultiSimulationSceneCarParam param);
+
+}

+ 43 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/MultiSimulationSceneMapper.java

@@ -0,0 +1,43 @@
+package com.css.simulation.resource.scheduler.infra.db.mysql.mapper;
+
+import api.common.pojo.param.project.MultiSimulationSceneParam;
+import api.common.pojo.po.project.MultiSimulationScenePO;
+import api.common.pojo.vo.project.MultiSimulationSceneVO;
+import org.apache.ibatis.annotations.*;
+
+import java.util.List;
+
+@Mapper
+public interface MultiSimulationSceneMapper {
+    @Insert("insert into multi_simulation_scene (id,project_id) values" +
+        "(#{id},#{projectId})")
+    int addMultiSimulationScene(MultiSimulationScenePO scenePO);
+
+    @Insert("insert into multi_simulation_scene (id,project_id, scene_name,map_id) values" +
+        "(#{id},#{projectId},#{sceneName},#{mapId})")
+    int addMultiSimulationSceneAll(MultiSimulationScenePO scenePO);
+
+    @Update("update multi_simulation_scene set map_id = #{mapId}" +
+        " where id = #{sceneId}")
+    int updateMultiSimulationSceneMapId(@Param("mapId")String mapId, @Param("sceneId")String sceneId);
+
+    @Select("select count(*) from multi_simulation_scene where project_id = #{projectId} and deleted = 0")
+    int selectSceneNumsByProjectId(@Param("projectId") String projectId);
+
+    @Update("update multi_simulation_scene set deleted = 1 where id = #{sceneId}")
+    int deleteMultiSimulationScene(MultiSimulationSceneParam param);
+
+    @Select("<script>" +
+        "select id,project_id,scene_name,map_id,create_time from multi_simulation_scene where deleted = 0 " +
+        "and project_id = #{projectId}" +
+        "order by create_time desc" +
+        "</script>")
+    List<MultiSimulationSceneVO> selectSceneList(MultiSimulationSceneParam param);
+
+    @Select("<script>" +
+        "select id,project_id,scene_name,map_id,create_time,project_result_overall_url,project_result_simulation_url from multi_simulation_scene where deleted = 0 " +
+        "and id = #{sceneId}" +
+        "</script>")
+    MultiSimulationSceneVO selectSceneDetailBySceneId(@Param("sceneId") String sceneId);
+
+}

+ 46 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/SimulationMapMapper.java

@@ -0,0 +1,46 @@
+package com.css.simulation.resource.scheduler.infra.db.mysql.mapper;
+
+import api.common.pojo.param.map.SimulationMapParam;
+import api.common.pojo.po.map.SimulationMapPO;
+import api.common.pojo.vo.map.SimulationMapVO;
+import org.apache.ibatis.annotations.*;
+
+import java.util.List;
+
+@Mapper
+public interface SimulationMapMapper {
+    @Insert("insert into simulation_map (id,map_code,map_name,map_json,map_osgb_path,map_path,map_description,path_num) values" +
+        "(#{id},#{mapCode},#{mapName},#{mapJson},#{mapOsgbPath},#{mapPath},#{mapDescription},#{pathNum})")
+    int addSimulationMap(SimulationMapPO scenePO);
+
+    @Select("select id,path_num,map_name,map_osgb_path,map_path,map_description,create_time from simulation_map where id = #{id} and deleted = 0")
+    SimulationMapVO selectMapByMapId(@Param("id") String mapId);
+
+    @Update("update simulation_map set deleted = 1 where id = #{mapId}")
+    int deleteMultiSimulationScene(SimulationMapParam param);
+
+    @Update("update simulation_map set map_path = #{mapPath},map_description = #{mapDescription} where id = #{mapId}")
+    int updateMultiSimulationScene(SimulationMapParam param);
+
+
+//
+//    @Select("<script>" +
+//        "select id,project_id,scene_name,map_id,create_time from multi_simulation_scene where deleted = 0 " +
+//        "and project_id = #{projectId}" +
+//        "order by create_time desc" +
+//        "</script>")
+//    List<MultiSimulationSceneVO> selectSceneList(MultiSimulationSceneParam param);
+
+    @Select("<script>" +
+        "select id,map_name,map_description,path_num,create_time from simulation_map where deleted = 0 " +
+        "<if test='mapName != null'> " +
+        " AND map_name = #{mapName}" +
+        "</if>" +
+        "<if test='mapDescription != null'> " +
+        " AND map_description = #{mapDescription}" +
+        "</if>" +
+        "order by create_time desc" +
+        "</script>")
+    List<SimulationMapVO> selectSimulationMapList(SimulationMapParam param);
+
+}

+ 42 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/SimulationMapPathMapper.java

@@ -0,0 +1,42 @@
+package com.css.simulation.resource.scheduler.infra.db.mysql.mapper;
+
+import api.common.pojo.param.map.SimulationMapPathParam;
+import api.common.pojo.po.map.SimulationMapPathPO;
+import api.common.pojo.vo.map.SimulationMapPathVO;
+import org.apache.ibatis.annotations.Insert;
+import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Select;
+
+import java.util.List;
+
+@Mapper
+public interface SimulationMapPathMapper {
+    @Insert("insert into simulation_map_path (id,map_id,path_sort,path_detail,path_start_point,path_end_point,start_point_num,end_point_num) values" +
+        "(#{id},#{mapId},#{pathSort},#{pathDetail},#{pathStartPoint},#{pathEndPoint},#{startPointNum},#{endPointNum})")
+    int addSimulationMap(SimulationMapPathPO simulationMapPathPO);
+
+//    @Select("select id,path_num,map_name,map_description,create_time from simulation_map where id = #{id} and deleted = 0")
+//    SimulationMapVO selectMapByMapId(@Param("id") String mapId);
+//
+//    @Update("update simulation_map set deleted = 1 where id = #{mapId}")
+//    int deleteMultiSimulationScene(SimulationMapParam param);
+//
+//    @Update("update simulation_map set map_path = #{mapPath},map_description = #{mapDescription} where id = #{mapId}")
+//    int updateMultiSimulationScene(SimulationMapParam param);
+
+
+//
+//    @Select("<script>" +
+//        "select id,project_id,scene_name,map_id,create_time from multi_simulation_scene where deleted = 0 " +
+//        "and project_id = #{projectId}" +
+//        "order by create_time desc" +
+//        "</script>")
+//    List<MultiSimulationSceneVO> selectSceneList(MultiSimulationSceneParam param);
+
+    @Select("<script>" +
+        "select id,map_id,path_sort,path_detail,path_start_point,path_end_point,start_point_num,end_point_num from simulation_map_path where deleted = 0 and map_id =#{mapId} " +
+        "order by path_sort asc" +
+        "</script>")
+    List<SimulationMapPathVO> selectSimulationMapPathList(SimulationMapPathParam param);
+
+}

+ 9 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/UserMapper.java

@@ -32,6 +32,15 @@ public interface UserMapper {
             "where su.id = (select smp.create_user_id from simulation_manual_project smp where smp.id = #{projectId})\n")
     UserEntity selectByManualProjectId(@Param("projectId") String projectId);
 
+    @ResultMap("user")
+    @Select("select su.id,\n" +
+        "       su.role_code,\n" +
+        "       su.use_type,\n" +
+        "       su.create_user_id\n" +
+        "from system_user su\n" +
+        "where su.id = (select smp.project_user_id from multi_simulation_project smp where smp.id = #{projectId})\n")
+    UserEntity selectByMultiManualProjectId(@Param("projectId") String projectId);
+
     @ResultMap("user")
     @Select("select su.id,\n" +
             "       su.role_code,\n" +

+ 20 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/entity/MultiInfoEntity.java

@@ -0,0 +1,20 @@
+package com.css.simulation.resource.scheduler.infra.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MultiInfoEntity {
+
+    private String project_id;
+    private String task_id;
+    private String task_path;
+    private Long default_time;
+
+    private String scene_id;
+}

+ 24 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/entity/MultiTaskMessageEntity.java

@@ -0,0 +1,24 @@
+package com.css.simulation.resource.scheduler.infra.entity;
+
+import com.css.simulation.resource.scheduler.app.entity.VehicleEntity;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ *
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MultiTaskMessageEntity {
+
+    private MultiInfoEntity info;
+    private ScenarioEntity scenario;
+    private VehicleEntity vehicleEntity;
+
+}

+ 22 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/fs/minio/MinioUtil.java

@@ -14,6 +14,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.security.InvalidKeyException;
 import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
 
 @Slf4j
 public class MinioUtil {
@@ -39,6 +41,26 @@ public class MinioUtil {
         log.info("rmR() 删除 minio 目录 " + bucketName + "/" + prefix + "完成。");
     }
 
+    // 获取所有文件,过滤调文件夹
+    @SneakyThrows
+    public static List<String> listAllFileName(MinioClient minioClient, String bucketName, String prefix) {
+        Iterable<Result<Item>> list = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName)
+            .prefix(prefix).recursive(true).build());
+        if (list == null || !list.iterator().hasNext()) {
+            log.info(prefix + " 不存在。");
+            return new ArrayList<>();
+        }
+        List<String> res = new ArrayList<>();
+        for (Result<Item> object : list) {
+            if (object.get().isDir()){
+                continue;
+            }
+            String name = object.get().objectName();
+            res.add(name);
+        }
+        return res;
+    }
+
     /**
      * 删除文件
      *