LingxinMeng 2 年之前
父节点
当前提交
f13df0bb9a

+ 4 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/configuration/custom/CustomConfiguration.java

@@ -17,4 +17,8 @@ public class CustomConfiguration {
     private String temporaryDirectory;
     private String uploadOsgbUrl;
     private String generateVideoUrl;
+    private String algorithmPlatformAppid;
+    private String algorithmPlatformSecret;
+    private String algorithmPlatformTokenUri;
+    private String algorithmPlatformAlgorithmAddrUri;
 }

+ 5 - 485
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -1,523 +1,43 @@
 package com.css.simulation.resource.scheduler.consumer;
 
 
-import api.common.pojo.constants.DictConstants;
 import api.common.pojo.dto.ProjectMessageDTO;
-import api.common.util.*;
-import com.css.simulation.resource.scheduler.data.entity.*;
-import com.css.simulation.resource.scheduler.data.model.DynamicsModel;
-import com.css.simulation.resource.scheduler.data.model.VehicleModel;
-import com.css.simulation.resource.scheduler.mapper.*;
-import com.css.simulation.resource.scheduler.service.ProjectManager;
+import api.common.util.JsonUtil;
 import com.css.simulation.resource.scheduler.service.ProjectService;
-import com.css.simulation.resource.scheduler.util.ApacheKafkaUtil;
-import com.css.simulation.resource.scheduler.util.MinioUtil;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
-import com.css.simulation.resource.scheduler.util.TaskUtil;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
 
 @Component
 @Slf4j
 public class ProjectConsumer {
-    @Value("${scheduler.linux-path.temp}")
-    private String linuxTempPath;
-    @Value("${scheduler.minio-path.project-result}")
-    private String projectResultPathOfMinio;
-    @Value("${minio.bucket-name}")
-    private String bucketName;
-
-    // -------------------------------- Comment --------------------------------
-    @Resource
-    private MinioClient minioClient;
-    @Resource
-    private StringRedisTemplate stringRedisTemplate;
-    @Resource
-    private ManualProjectMapper manualProjectMapper;
-    @Resource
-    private AutoSubProjectMapper autoSubProjectMapper;
-    @Resource
-    private VehicleMapper vehicleMapper;
-    @Resource
-    private SensorCameraMapper sensorCameraMapper;
-    @Resource
-    private SensorOgtMapper sensorOgtMapper;
-    @Resource
-    private AlgorithmMapper algorithmMapper;
-    @Resource
-    private UserMapper userMapper;
-    @Resource
-    private ClusterMapper clusterMapper;
     @Resource
     private ProjectService projectService;
     @Resource
     private ProjectUtil projectUtil;
-    @Resource
-    private IndexMapper indexMapper;
-    @Resource
-    private TaskMapper taskMapper;
-    @Resource
-    private TaskUtil taskUtil;
-    @Resource
-    private ProjectManager projectManager;
-    @Resource
-    private KafkaTemplate<String, String> kafkaTemplate;
-    @Resource(name = "myKafkaAdmin")
-    private Admin kafkaAdminClient;
+
 
     /**
-     * 接收到运行信息立即复制一份数据作为运行数据
-     *
      * @param projectRecord 项目启动消息
      */
-    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.start-topic}")
+    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${custom.mq-start-project-topic}")
     @SneakyThrows
     public void acceptMessage(ConsumerRecord<String, String> projectRecord) {
         final ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectRecord.value(), ProjectMessageDTO.class);
         log.info("接收到项目开始消息为:" + projectMessageDTO);
-        new Thread(() -> createTaskAndFixData(projectMessageDTO), "fix-" + projectMessageDTO.getProjectId()).start();
-    }
-
-
-    public void createTaskAndFixData(ProjectMessageDTO projectMessageDTO) {
-        //* -------------------------------- 0 读取消息,创建临时目录 --------------------------------
-        String projectId = projectMessageDTO.getProjectId();                // 手动执行项目 id 或 自动执行子项目 id
-        String projectType = projectMessageDTO.getType();                   // 项目类型
-        String isChoiceGpu = projectUtil.getIsChoiceGpuByProjectId(projectId);
-        try {
-            String modelType = projectMessageDTO.getModelType();                // 模型类型,1 动力学模型 2 carsim模型
-            String packageId = projectMessageDTO.getScenePackageId();           // 场景测试包 id
-            String vehicleConfigId = projectMessageDTO.getVehicleConfigId();    // 模型配置 id
-            String algorithmId = projectMessageDTO.getAlgorithmId();            // 模型配置 id
-            long videoTime = projectMessageDTO.getMaxSimulationTime();          // 结果视频的时长
-            String userId = projectUtil.getUserIdByProjectIdAndProjectType(projectId, projectType);
-            String projectPath = linuxTempPath + "project/" + projectId + "/";
-            FileUtil.mkdir(projectPath);
-            //5 将该 project 下所有旧的指标得分删除。
-            taskMapper.deleteByProject(projectId);
-            indexMapper.deleteFirstTargetScoreByProjectId(projectId);
-            indexMapper.deleteLastTargetScoreByProjectId(projectId);
-            // -------------------------------- 1 查询场景 --------------------------------
-            log.info("项目 " + projectId + " 开始查询场景。");
-            //根据场景测试包 packageId,拿到场景集合(包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
-            List<SceneEntity> sceneEntityList = projectService.getSceneList(projectId, packageId);
-            int taskTotal = sceneEntityList.size();
-            projectMessageDTO.setTaskTotal(taskTotal);
-            projectMessageDTO.setTaskCompleted(0);
-            //去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
-            Set<SceneEntity> sceneEntitySet = new HashSet<>(sceneEntityList);
-            log.info("项目 " + projectId + " 场景包括:" + sceneEntitySet);
-            // -------------------------------- 2 算法导入 --------------------------------
-            log.info("项目 " + projectId + " 开始算法导入。");
-            String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
-            log.info("项目 " + projectId + " 算法已导入:" + algorithmDockerImage);
-            // -------------------------------- 3 查询模型 --------------------------------
-            if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
-                log.debug("项目 " + projectId + " 开始查询模型。");
-                //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
-                VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
-                List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
-                List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
-                // -------------------------------- 4 保存任务消息 --------------------------------
-                log.debug("项目 " + projectId + " 开始保存任务消息。");
-                List<TaskEntity> taskList = new ArrayList<>();
-                for (SceneEntity sceneEntity : sceneEntitySet) {
-                    String sceneId = sceneEntity.getId();
-                    //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
-                    List<String> lastTargetIdList = null;
-                    if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-                        lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
-                    } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-                        lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
-                    }
-                    if (CollectionUtil.isEmpty(lastTargetIdList)) {
-                        throw new RuntimeException("项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
-                    }
-                    for (String lastTargetId : lastTargetIdList) {
-                        String taskId = StringUtil.getRandomUUID();
-                        // 保存任务信息
-                        TaskEntity taskEntity = TaskEntity.builder() // run_start_time 和 run_end_time 不填
-                                .id(taskId).pId(projectId).sceneId(sceneId).lastTargetId(lastTargetId).sceneName(sceneEntity.getName()).sceneType(sceneEntity.getType()).runState(DictConstants.TASK_PENDING).runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId).build();
-                        taskEntity.setCreateTime(TimeUtil.getNowForMysql());
-                        taskEntity.setCreateUserId(userId);
-                        taskEntity.setModifyTime(TimeUtil.getNowForMysql());
-                        taskEntity.setModifyUserId(userId);
-                        taskEntity.setModifyTime(TimeUtil.getNowForMysql());
-                        taskEntity.setIsDeleted("0");
-                        taskList.add(taskEntity);
-                        // 将 xosc、xodr、osgb 全部上传到仿真结果路径
-                        String scenarioOsc = sceneEntity.getScenarioOsc();
-                        String[] splitXosc = scenarioOsc.split("/");
-                        String xoscName = splitXosc[splitXosc.length - 1];
-                        String[] xoscNameSplit = xoscName.split("\\.");
-                        String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
-                        String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
-                        String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
-                        MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
-                        MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
-                        FileUtil.rm(xoscPathOfLinux);   // 删除临时文件
-
-                        String scenarioOdr = sceneEntity.getScenarioOdr();
-                        String[] splitXodr = scenarioOdr.split("/");
-                        String xodrName = splitXodr[splitXodr.length - 1];
-                        String[] xodrNameSplit = xodrName.split("\\.");
-                        String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
-                        String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
-                        String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
-                        MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
-                        MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
-                        FileUtil.rm(xodrPathOfLinux);   // 删除临时文件
-
-                        String scenarioOsgb = sceneEntity.getScenarioOsgb();
-                        String[] splitOsgb = scenarioOsgb.split("/");
-                        String osgbName = splitOsgb[splitOsgb.length - 1];
-                        String[] osgbNameSplit = osgbName.split("\\.");
-                        String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
-                        String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
-                        String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
-                        MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
-                        MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
-                        FileUtil.rm(osgbPathOfLinux);   // 删除临时文件
-
-                        // 组装 task 消息
-                        TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleModel.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(DynamicsModel.builder().dynamics_maxspeed(vehicleEntity.getMaxSpeed()).dynamics_enginepower(vehicleEntity.getEnginePower()).dynamics_maxdecel(vehicleEntity.getMaxDeceleration()).dynamics_maxsteering(vehicleEntity.getMaxSteeringAngle()).dynamics_mass(vehicleEntity.getMass()).dynamics_frontsurfaceeffective(vehicleEntity.getFrontSurfaceEffective()).dynamics_airdragcoefficient(vehicleEntity.getAirDragCoefficient()).dynamics_rollingresistance(vehicleEntity.getRollingResistanceCoefficient()).dynamics_wheeldiameter(vehicleEntity.getWheelDiameter()).dynamics_wheeldrive(vehicleEntity.getWheelDrive()).dynamics_overallefficiency(vehicleEntity.getOverallEfficiency()).dynamics_distfront(vehicleEntity.getFrontDistance()).dynamics_distrear(vehicleEntity.getRearDistance()).dynamics_distleft(vehicleEntity.getLeftDistance()).dynamics_distright(vehicleEntity.getRightDistance()).dynamics_distheight(vehicleEntity.getHeightDistance()).dynamics_wheelbase(vehicleEntity.getWheelbase()).build()).sensors(SensorsEntity.builder()   // 根据 vehicleId 查询绑定的传感器列表
-                                .camera(cameraEntityList).OGT(ogtEntityList).build()).build()).build();
-                        FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
-                        log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
-                    }
-                }
-                taskUtil.batchInsertTask(taskList);
-                log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
-            } else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
-                log.debug("项目 " + projectId + " 开始查询模型。");
-                VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
-                List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
-                List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
-                // -------------------------------- 4 保存任务消息 --------------------------------
-                log.debug("项目 " + projectId + " 开始保存任务消息。");
-                List<TaskEntity> taskList = new ArrayList<>();
-                for (SceneEntity sceneEntity : sceneEntitySet) {
-                    String sceneId = sceneEntity.getId();
-                    //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
-                    List<String> lastTargetIdList = null;
-                    if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-                        lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
-                    } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-                        lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
-                    }
-                    if (CollectionUtil.isEmpty(lastTargetIdList)) {
-                        throw new RuntimeException("项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
-                    }
-                    for (String lastTargetId : lastTargetIdList) {
-                        String taskId = StringUtil.getRandomUUID();
-                        // 保存任务信息
-                        TaskEntity taskEntity = TaskEntity.builder() // run_start_time 和 run_end_time 不填
-                                .id(taskId).pId(projectId).sceneId(sceneId).lastTargetId(lastTargetId).sceneName(sceneEntity.getName()).sceneType(sceneEntity.getType()).runState(DictConstants.TASK_PENDING).runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId).build();
-                        taskEntity.setCreateTime(TimeUtil.getNowForMysql());
-                        taskEntity.setCreateUserId(userId);
-                        taskEntity.setModifyTime(TimeUtil.getNowForMysql());
-                        taskEntity.setModifyUserId(userId);
-                        taskEntity.setModifyTime(TimeUtil.getNowForMysql());
-                        taskEntity.setIsDeleted("0");
-                        taskList.add(taskEntity);
-                        // 将 xosc、xodr、osgb 全部上传到仿真结果路径
-                        String scenarioOsc = sceneEntity.getScenarioOsc();
-                        String[] splitXosc = scenarioOsc.split("/");
-                        String xoscName = splitXosc[splitXosc.length - 1];
-                        String[] xoscNameSplit = xoscName.split("\\.");
-                        String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
-                        String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
-                        String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
-                        MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
-                        MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
-
-                        String scenarioOdr = sceneEntity.getScenarioOdr();
-                        String[] splitXodr = scenarioOdr.split("/");
-                        String xodrName = splitXodr[splitXodr.length - 1];
-                        String[] xodrNameSplit = xodrName.split("\\.");
-                        String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
-                        String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
-                        String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
-                        MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
-                        MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
-
-                        String scenarioOsgb = sceneEntity.getScenarioOsgb();
-                        String[] splitOsgb = scenarioOsgb.split("/");
-                        String osgbName = splitOsgb[splitOsgb.length - 1];
-                        String[] osgbNameSplit = osgbName.split("\\.");
-                        String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
-                        String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
-                        String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
-                        MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
-                        MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
-                        log.info("已经将 xosc、xodr、osgb 上传到 minio 的结果文件目录:" + projectResultPathOfMinio);
-
-                        // 组装 task 消息
-                        // carsim 不需要查询模型参数
-                        TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleModel.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(null).sensors(SensorsEntity.builder()   // 根据 vehicleId 查询绑定的传感器列表
-                                .camera(cameraEntityList).OGT(ogtEntityList).build()).build()).build();
-
-                        FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
-                        log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
-                    }
-                }
-                taskUtil.batchInsertTask(taskList);
-                log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
-            }
-
-            //* -------------------------------- 4 开始排队 --------------------------------
-            cacheProject(projectMessageDTO);
-        } catch (Exception e) {
-            log.error("项目报错。", e);
-            projectService.stopProject(isChoiceGpu, projectId, projectType, e.getMessage());
-            throw new RuntimeException(e);
-        }
-
-    }
-
-
-    /**
-     * 任务运行前首先判断用户是否拥有可分配资源
-     *
-     * @param projectMessageDTO 项目启动消息
-     */
-    @SneakyThrows
-    public void cacheProject(ProjectMessageDTO projectMessageDTO) {
-        log.debug("判断用户是否拥有可分配资源:" + projectMessageDTO);
-        //1 读取 kafka 的 project 信息
-        final String modelType = projectMessageDTO.getModelType();
-        String projectId = projectMessageDTO.getProjectId();    // 手动执行项目 id 或 自动执行子项目 id
-        long parallelism = projectMessageDTO.getParallelism();   // 项目并行度
-        String projectType = projectMessageDTO.getType(); // 项目类型
-        //2 获取用户信息(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
-        final UserEntity userEntity = projectUtil.getUserEntityByProjectIdAndProjectType(projectId, projectType);
-        String projectUserId = userEntity.getId();
-        log.debug("项目 " + projectId + " 的创建人为:" + userEntity);
-        String roleCode = userEntity.getRoleCode();
-        String useType = userEntity.getUseType();
-        ClusterEntity clusterEntity;
-        String clusterUserId;  // 项目实际运行使用的用户集群
-        if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
-            clusterUserId = DictConstants.SYSTEM_CLUSTER_ID;
-            log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
-            PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
-            run(projectMessageDTO, clusterUserId, modelType, DictConstants.SYSTEM_USER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
-            return;
-        } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
-            clusterUserId = projectUserId;
-            clusterEntity = clusterMapper.selectByUserId(clusterUserId);
-            log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterEntity);
-        } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
-            if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) {   //3-3 普通子账户,根据自己的独占节点排队
-                clusterUserId = projectUserId;
-                clusterEntity = clusterMapper.selectByUserId(clusterUserId);
-                log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通独占子账户(自己的集群),集群为:" + clusterEntity);
-            } else if (DictConstants.USER_TYPE_PUBLIC.equals(useType)) {    //3-4 共享子账户,根据父账户的共享节点排队
-                clusterUserId = userEntity.getCreateUserId();
-                clusterEntity = clusterMapper.selectByUserId(clusterUserId);
-                log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通共享子账户(父账户的集群),集群为:" + clusterEntity);
-            } else {
-                throw new RuntimeException("用户" + projectUserId + "未知占用类型:" + useType);
-            }
-        } else {
-            throw new RuntimeException("未知角色类型:" + roleCode);
-        }
-        PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterEntity.getId(), projectId);
-        final Integer usingSimulationLicenseNumber = projectUtil.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_SIMULATION);
-        final Integer usingDynamicLicenseNumber;
-        final Integer numSimulationLicense = clusterEntity.getNumSimulationLicense();
-        final Integer numDynamicLicense = clusterEntity.getNumDynamicLicense();
-        final String clusterId = clusterEntity.getId();
-        if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
-            // 获取仿真软件证书数量和动力学软件证书数量(vtd占一个仿真证书,carsim各占一个)
-//            // 获取正在运行的项目的并行度总和
-//            int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
-            // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
-            if (usingSimulationLicenseNumber + parallelism <= numSimulationLicense) {
-                run(projectMessageDTO, clusterUserId, modelType, clusterEntity.getId(), redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
-            } else {
-                log.info("VTD 项目 {} 并行度为 {},用户 {} 的集群 {} 的仿真证书总数量为 {},已使用数量为 {}。该项目加入等待队列。 ", projectId, parallelism, clusterUserId, clusterId, numSimulationLicense, usingSimulationLicenseNumber);
-                wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
-            }
-        } else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
-            // 获取仿真软件证书数量和动力学软件证书数量(vtd占一个仿真证书,carsim各占一个)
-//                // 获取正在运行的项目的并行度总和
-//                int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
-            // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
-            usingDynamicLicenseNumber = projectUtil.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_DYNAMIC);
-            if (usingSimulationLicenseNumber + parallelism <= numSimulationLicense && usingDynamicLicenseNumber + parallelism <= numDynamicLicense) {
-                run(projectMessageDTO, clusterUserId, modelType, clusterEntity.getId(), redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
-            } else {
-                log.info("CARSIM 项目 {} 并行度为 {},用户 {} 的集群 {} 的仿真证书总数量为 {},已使用数量为 {},动力学证书总数量为 {},已使用数量为 {}。该项目加入等待队列。 ", projectId, parallelism, clusterUserId, clusterId, numSimulationLicense, usingSimulationLicenseNumber, numDynamicLicense, usingDynamicLicenseNumber);
-                wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
-            }
-        } else {
-            throw new RuntimeException("未知模型类型:" + modelType);
-        }
-    }
-
-    //* -------------------------------- 运行 --------------------------------
-
-    /**
-     * @param projectMessageDTO 初始接收到的项目启动信息
-     * @param clusterId         集群ID
-     * @param projectRunningKey projectRunningKey
-     * @param projectWaitingKey projectWaitingKey
-     */
-    public void run(ProjectMessageDTO projectMessageDTO, String clusterUserId, String modelType, String clusterId, String projectRunningKey, String projectWaitingKey) {
-        String projectId = projectMessageDTO.getProjectId();    // 项目 id
-        String isChoiceGpu = projectUtil.getIsChoiceGpuByProjectId(projectId);
-        int parallelism = projectMessageDTO.getParallelism();  // 期望并行度
-        //1 获取集群剩余可用并行度
-        int restParallelism = projectUtil.getRestParallelism(isChoiceGpu);
-        //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
-        if (restParallelism > 0L) {
-            log.info("集群 " + clusterId + " 执行项目 " + projectId);
-            if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)) {
-                projectUtil.useLicense(clusterUserId, modelType, parallelism);
-            }
-            // 设置实际的并行度
-            projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism));   // 设置实际的并行度
-            parseProject(projectMessageDTO, projectRunningKey, isChoiceGpu);
-        } else {
-            log.info("服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
-            wait(projectWaitingKey, projectMessageDTO);
-        }
+        new Thread(() -> projectService.createTaskAndFixData(projectMessageDTO), "fix-" + projectMessageDTO.getProjectId()).start();
     }
 
-    /**
-     * @param projectMessageDTO 初始接收到的项目启动信息
-     * @param projectRunningKey projectRunningKey
-     */
-    @SneakyThrows
-    public void parseProject(ProjectMessageDTO projectMessageDTO, String projectRunningKey, String isChoiceGpu) {
-        String projectId = projectMessageDTO.getProjectId();    // 项目 id
-        String modelType = projectMessageDTO.getModelType();
-        String vehicleConfigId = projectMessageDTO.getVehicleConfigId();
-        int currentParallelism = projectMessageDTO.getCurrentParallelism();   // 当前并行度
-        String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
-        String projectPath = linuxTempPath + "project/" + projectId + "/";
-        // -------------------------------- 1 获取任务 json 列表 --------------------------------
-        List<String> taskJsonList = FileUtil.listAbsolutePathByTypeAndLength(projectPath, "json", (StringUtil.getRandomUUID() + ".json").length());
-        int taskTotal = taskJsonList.size();
-        projectMessageDTO.setTaskTotal(taskTotal);
-        projectMessageDTO.setTaskCompleted(0);
-        // 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
-        //1 获取剩余并行度和即将使用的各node的并行度
-        Map<String, Integer> nodeMap0 = projectUtil.getNodeMap(isChoiceGpu);
-        Map<String, Integer> nodeMap = projectUtil.getNodeMapToUse(isChoiceGpu, Math.min(currentParallelism, taskTotal));
-        //2 将指定 node 的并行度减少
-        nodeMap.keySet().forEach(nodeName -> projectUtil.decrementParallelism(isChoiceGpu, nodeName, nodeMap.get(nodeName)));
-        // 重新设置实际使用的并行度并保存到 redis
-        int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
-        projectMessageDTO.setCurrentParallelism(realCurrentParallelism);
-        log.info("项目 " + projectId + " 运行在:" + nodeMap);
-        stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
-        //* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
-        String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
-        // -------------------------------- 4 发送任务消息 --------------------------------
-        List<NodeEntity> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
-        int messageNumber = 0;
-        ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1);   // 创建主题
-        TimeUnit.SECONDS.sleep(7);
-        // 需要即时启动的任务(并行度的大小)
-        CopyOnWriteArrayList<String> yamlToRunRedisKeyList = new CopyOnWriteArrayList<>();
-        for (String taskJsonPath : taskJsonList) {
-            String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
-            // 保存运行中的任务信息
-            String taskMessageKey = projectRunningKey + ":task:" + taskId + ":message";
-            String taskJson = FileUtil.read(taskJsonPath);
-            stringRedisTemplate.opsForValue().set(taskMessageKey, taskJson);
-
-            //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
-            SendResult<String, String> stringStringSendResult = kafkaTemplate.send(projectId, messageNumber % currentParallelism, taskId, taskJson).get();
-            RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
-            String topic = recordMetadata.topic();  // 消息发送到的topic
-            int partition = recordMetadata.partition(); // 消息发送到的分区
-            long offset = recordMetadata.offset();  // 消息在分区内的offset
-            log.info("发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
-            //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
-            // 选一个 count 最少的 node
-            String currentNodeName = "";
-            NodeEntity currentNodeEntity = null;
-            int currentCount = Integer.MAX_VALUE;
-            log.info("各节点已经预定的任务个数为:" + nodeListToCount);
-            for (NodeEntity nodeEntity : nodeListToCount) {
-                int tempCount = nodeEntity.getCount();
-                String tempNodeName = nodeEntity.getNodeName();
-                if (tempCount < currentCount) {
-                    currentCount = tempCount;
-                    currentNodeName = tempNodeName;
-                    currentNodeEntity = nodeEntity;
-                }
-            }
-            if (currentNodeEntity == null) {
-                String errorMessage = "挑选节点失败。";
-                log.info(errorMessage);
-                throw new RuntimeException(errorMessage);
-            }
-            currentNodeEntity.setCount(currentNodeEntity.getCount() + 1);
-            Integer cpuOrder = null;
-            if (currentCount == 0) {
-                // 根据各节点剩余并行度,倒序获取 cpu 编号
-                cpuOrder = nodeMap0.get(currentNodeName) - 1;
-                nodeMap0.put(currentNodeName, cpuOrder);
-            }
-            // 只有准备启动(即 currentCount == 0)的时候才指定 cpu 编号
-            log.info("创建任务 " + taskId + " 的 yaml:是否使用 gpu (0是1否)" + isChoiceGpu + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前节点名称为:" + currentNodeName + ",当前 cpu 编号为:" + cpuOrder);
-            String yamlRedisKey = projectManager.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder);
-
-            if (currentCount == 0) {
-                String podName = yamlRedisKey.split(":")[yamlRedisKey.split(":").length - 1];
-                log.info("将 pod 加入到启动列表 " + podName);
-                yamlToRunRedisKeyList.add(yamlRedisKey);
-            }
-            messageNumber++;
-        }
-        TimeUnit.SECONDS.sleep(6);
-        log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
-        for (String redisKey : yamlToRunRedisKeyList) {
-            projectUtil.createPodBegin(projectId, redisKey);
-        }
-        log.info("项目 " + projectId + " 已经启动 " + yamlToRunRedisKeyList);
-    }
-
-
-    //* -------------------------------- 等待 --------------------------------
-
-    /**
-     * @param projectWaitingKey 项目等待 key
-     * @param projectMessageDTO 项目信息
-     */
-    public void wait(String projectWaitingKey, ProjectMessageDTO projectMessageDTO) {
-        stringRedisTemplate.opsForValue().set(projectWaitingKey, JsonUtil.beanToJson(projectMessageDTO));
-    }
     //* -------------------------------- 结束 --------------------------------
 
-    /**
-     * {
-     * "projectId": "sadfasdfs",	// 项目 id
-     * "type": "1",	// 项目类型
-     * }
-     */
-    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")
+    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${custom.mq-stop-project-topic}")
     @SneakyThrows
     public void stopProject(ConsumerRecord<String, String> stopRecord) {
         log.info("接收到的项目终止消息为:" + stopRecord);

+ 9 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/data/entity/ProjectWaitingQueueEntity.java

@@ -0,0 +1,9 @@
+package com.css.simulation.resource.scheduler.data.entity;
+
+import lombok.Data;
+
+@Data
+public class ProjectWaitingQueueEntity {
+    private String projectId;
+    private String waitingType; //1等待扩充 2等待执行
+}

+ 0 - 153
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectManager.java

@@ -1,153 +0,0 @@
-package com.css.simulation.resource.scheduler.service;
-
-import api.common.pojo.constants.DictConstants;
-import api.common.util.FileUtil;
-import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
-import com.css.simulation.resource.scheduler.configuration.minio.MinioConfiguration;
-import com.css.simulation.resource.scheduler.mapper.VehicleMapper;
-import com.css.simulation.resource.scheduler.util.ProjectUtil;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.data.redis.core.StringRedisTemplate;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-import java.io.File;
-
-@Component
-@Slf4j
-public class ProjectManager {
-
-    @Value("${scheduler.linux-path.vtd-pod-template-yaml}")
-    private String vtdPodTemplateYaml;
-    @Value("${scheduler.linux-path.carsim-pod-template-yaml}")
-    private String carsimPodTemplateYaml;
-    @Value("${scheduler.simulation-cloud-ip}")
-    private String simulationCloudIp;
-    @Value("${scheduler.linux-path.pod-yaml-directory}")
-    private String podYamlDirectory;
-    @Value("${spring.kafka.bootstrap-servers}")
-    private String kafkaIp;
-    @Resource
-    private KubernetesConfiguration kubernetesConfiguration;
-    @Resource
-    private MinioConfiguration minioConfiguration;
-    @Resource
-    private ProjectUtil projectUtil;
-    @Resource
-    private VehicleMapper vehicleMapper;
-    @Resource
-    private StringRedisTemplate stringRedisTemplate;
-
-
-    /**
-     * 创建一个临时 yaml,node 在最后用 # 号隔开
-     */
-    @SneakyThrows
-    public String createTempYaml(String projectId,
-                                 String vehicleConfigId,
-                                 String modelType,
-                                 String algorithmDockerImage,
-                                 String nodeName,
-                                 int kafkaPartition,
-                                 long kafkaOffset,
-                                 String isChoiceGpu,
-                                 Integer cpuOrder
-
-    ) {
-        String podName = projectUtil.getRandomPodName(projectId);   // 生成 podName
-        String podYaml = projectUtil.getPodYamlName(nodeName, podName);     // 模板文件名称
-        String yamlPath = podYamlDirectory + podYaml;
-        String finalYaml;
-
-        if ("1".equals(modelType)) {
-            String podString = FileUtil.read(new File(vtdPodTemplateYaml));
-            String replace0 = podString.replace("vtd-container", "vtd-" + projectId);
-            String replace1 = replace0.replace("simulation-cloud-ip", simulationCloudIp);
-            String replace2 = replace1.replace("kafka-ip", kafkaIp);
-            String replace3 = replace2.replace("kafka-topic", projectId);     // 消息主题名称为 projectId
-            String replace4 = replace3.replace("kafka-partition", "\"" + kafkaPartition + "\"");
-            String replace5 = replace4.replace("kafka-offset", "\"" + kafkaOffset + "\"");
-            String replace6 = replace5.replace("minio-ip", minioConfiguration.getEndpointWithoutHttp());
-            String replace7 = replace6.replace("minio-access-key", minioConfiguration.getAccessKey());
-            String replace8 = replace7.replace("minio-secret-key", minioConfiguration.getSecretKey());
-
-            String replace9 = replace8.replace("algorithm-container", "algorithm-" + projectId);
-            String replace10 = replace9.replace("algorithm-image", algorithmDockerImage);
-
-            String replace11 = replace10.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
-            String replace12 = replace11.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
-            String replace13 = replace12.replace("node-name", nodeName);     // 指定 pod 运行节点
-
-            String replace14;
-            if (cpuOrder != null) {
-                replace14 = replace13.replace("cpu-order", "\"" + cpuOrder + "\"");     // 指定 cpu 编号
-                stringRedisTemplate.opsForValue().set("project:" + projectId + ":pod:" + podName + ":cpu", cpuOrder + "");    //  pod 运行使用的 cpu编号
-            } else {
-                replace14 = replace13;
-            }
-
-            if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
-                String replace15 = replace14.replace("vtd-image", kubernetesConfiguration.getImageVtdGpu());
-                finalYaml = replace15.replace("vtd-command", kubernetesConfiguration.getCommandVtdGpu());
-            } else if (DictConstants.NOT_USE_GPU.equals(isChoiceGpu)) {
-                String replace15 = replace14.replace("vtd-image", kubernetesConfiguration.getImageVtdNogpu());
-                finalYaml = replace15.replace("vtd-command", kubernetesConfiguration.getCommandVtdNogpu());
-            } else {
-                throw new RuntimeException("createTempYaml() 是否使用 gpu:" + isChoiceGpu);
-            }
-        } else if ("2".equals(modelType)) {
-            String podString = FileUtil.read(new File(carsimPodTemplateYaml));
-            String replace0 = podString.replace("vtd-container", "vtd-" + projectId);
-            String replace1 = replace0.replace("simulation-cloud-ip", simulationCloudIp);
-            String replace2 = replace1.replace("kafka-ip", kafkaIp);
-            String replace3 = replace2.replace("kafka-topic", projectId);     // 消息主题名称为 projectId
-            String replace4 = replace3.replace("kafka-partition", "\"" + kafkaPartition + "\"");
-            String replace5 = replace4.replace("kafka-offset", "\"" + kafkaOffset + "\"");
-            String replace6 = replace5.replaceAll("minio-ip", minioConfiguration.getEndpointWithoutHttp());
-            String replace7 = replace6.replaceAll("minio-access-key", minioConfiguration.getAccessKey());
-            String replace8 = replace7.replaceAll("minio-secret-key", minioConfiguration.getSecretKey());
-
-            String replace9 = replace8.replace("algorithm-container", "algorithm-" + projectId);
-            String replace10 = replace9.replace("algorithm-image", algorithmDockerImage);
-
-            String replace11 = replace10.replace("carsim-container", "carsim-" + projectId);
-            String replace12 = replace11.replace("carsim-image", kubernetesConfiguration.getCarsimImage());
-            String replace13 = replace12.replace("carsim-command", kubernetesConfiguration.getCarsimCommand());
-            String replace14 = replace13.replace("minio-bucket", minioConfiguration.getBucketName());
-            String replace15 = replace14.replace("par-path", vehicleMapper.selectParPathByVehicleConfigId(vehicleConfigId));
-
-            String replace16 = replace15.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
-            String replace17 = replace16.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
-            String replace18 = replace17.replace("node-name", nodeName);     // 指定 pod 运行节点
-
-            String replace19;
-            if (cpuOrder != null) {
-                replace19 = replace18.replace("cpu-order", "\"" + cpuOrder + "\"");     // 指定 cpu 编号
-                stringRedisTemplate.opsForValue().set("project:" + projectId +":pod:" + podName + ":cpu", cpuOrder + "");    //  pod 运行使用的 cpu编号
-            } else {
-                replace19 = replace18;
-            }
-
-            if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
-                log.info("createTempYaml() k8s参数为:" + kubernetesConfiguration);
-                log.info("createTempYaml() yaml模板为:" + replace12);
-                String replace20 = replace19.replace("vtd-image", kubernetesConfiguration.getImageVtdGpu());
-                finalYaml = replace20.replace("vtd-command", kubernetesConfiguration.getCommandVtdCarsimGpu());
-            } else if (DictConstants.NOT_USE_GPU.equals(isChoiceGpu)) {
-                String replace20 = replace19.replace("vtd-image", kubernetesConfiguration.getImageVtdNogpu());
-                finalYaml = replace20.replace("vtd-command", kubernetesConfiguration.getCommandVtdCarsimNogpu());
-            } else {
-                throw new RuntimeException("createTempYaml() 是否使用 gpu:" + isChoiceGpu);
-            }
-        } else {
-            throw new RuntimeException("createTempYaml() 模型类型错误:" + modelType);
-        }
-        log.info("保存项目 " + projectId + " 的 yaml 文件:" + yamlPath);
-        FileUtil.writeStringToLocalFile(finalYaml, yamlPath);
-        String yamlRedisKey = "project:" + projectId + ":node:" + nodeName + ":yaml:" + podName;
-        stringRedisTemplate.opsForValue().set(yamlRedisKey, yamlPath);
-        return yamlRedisKey;
-    }
-}

+ 571 - 90
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -1,15 +1,18 @@
 package com.css.simulation.resource.scheduler.service;
 
 import api.common.pojo.constants.DictConstants;
+import api.common.pojo.dto.ProjectMessageDTO;
 import api.common.pojo.po.scheduler.SchedulerProjectPO;
 import api.common.util.*;
+import com.css.simulation.resource.scheduler.configuration.custom.CustomConfiguration;
 import com.css.simulation.resource.scheduler.configuration.docker.DockerConfiguration;
 import com.css.simulation.resource.scheduler.configuration.git.GitConfiguration;
+import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
+import com.css.simulation.resource.scheduler.configuration.minio.MinioConfiguration;
 import com.css.simulation.resource.scheduler.configuration.redis.CustomRedisClient;
-import com.css.simulation.resource.scheduler.data.entity.AlgorithmEntity;
-import com.css.simulation.resource.scheduler.data.entity.IndexTemplateEntity;
-import com.css.simulation.resource.scheduler.data.entity.PrefixEntity;
-import com.css.simulation.resource.scheduler.data.entity.SceneEntity;
+import com.css.simulation.resource.scheduler.data.entity.*;
+import com.css.simulation.resource.scheduler.data.model.DynamicsModel;
+import com.css.simulation.resource.scheduler.data.model.VehicleModel;
 import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.util.*;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -19,70 +22,84 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Resource;
 import java.io.File;
 import java.io.InputStream;
 import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 @Service
 @Slf4j
 public class ProjectService {
 
-    /*
-      algorithm-platform:
-      appid: 2af6f44d98104dc5adcbfb49809ff9d5
-      secret: db129a741fde1e9f474199dea24f3901
-      token-uri: https://open.zoogooy.com.cn/cgi-bin/token/token?grant_type=client_credential
-      algorithm-list-uri: https://open.zoogooy.com.cn/cgi-bin/api/icv-algorithm-agg/simulation/evaluation
-      algorithm-addr-uri:  https://open.zoogooy.com.cn/cgi-bin/api/icv-algorithm-agg/simulation/download
-     */
-
-    @Value("${algorithm-platform.appid}")
-    private String algorithmPlatformAppid;
-    @Value("${algorithm-platform.secret}")
-    private String algorithmPlatformSecret;
-    @Value("${algorithm-platform.token-uri}")
-    private String algorithmPlatformTokenUri;
-    @Value("${algorithm-platform.algorithm-addr-uri}")
-    private String algorithmPlatformAlgorithmAddrUri;
     @Value("${scheduler.minio-path.project-result}")
     private String projectResultPathOfMinio;
     @Value("${scheduler.linux-path.temp}")
     private String linuxTempPath;
-    @Value("${scheduler.linux-path.pod-template-yaml}")
-    private String podTemplateYaml;
     @Value("${scheduler.linux-path.pod-yaml-directory}")
     private String podYamlDirectory;
     @Value("${minio.bucket-name}")
     private String bucketName;
+    @Value("${scheduler.linux-path.vtd-pod-template-yaml}")
+    private String vtdPodTemplateYaml;
+    @Value("${scheduler.linux-path.carsim-pod-template-yaml}")
+    private String carsimPodTemplateYaml;
+    @Value("${scheduler.simulation-cloud-ip}")
+    private String simulationCloudIp;
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String kafkaIp;
 
+    // -------------------------------- Comment --------------------------------
+    @Resource
+    private MinioClient minioClient;
     @Resource
     private StringRedisTemplate stringRedisTemplate;
-    @Resource(name = "myKafkaAdmin")
-    private Admin kafkaAdminClient;
     @Resource
-    private CloseableHttpClient closeableHttpClient;
+    private VehicleMapper vehicleMapper;
     @Resource
-    private RequestConfig requestConfig;
+    private SensorCameraMapper sensorCameraMapper;
     @Resource
-    private MinioClient minioClient;
+    private SensorOgtMapper sensorOgtMapper;
+    @Resource
+    private AlgorithmMapper algorithmMapper;
+    @Resource
+    private ClusterMapper clusterMapper;
+    @Resource
+    private IndexMapper indexMapper;
     @Resource
     private TaskMapper taskMapper;
     @Resource
-    private IndexTemplateMapper indexTemplateMapper;
+    private TaskUtil taskUtil;
     @Resource
-    private SceneMapper sceneMapper;
+    private KafkaTemplate<String, String> kafkaTemplate;
+    @Resource(name = "myKafkaAdmin")
+    private Admin kafkaAdminClient;
     @Resource
-    private AlgorithmMapper algorithmMapper;
+    private CustomConfiguration customConfiguration;
+    @Resource
+    private KubernetesConfiguration kubernetesConfiguration;
+    @Resource
+    private MinioConfiguration minioConfiguration;
     @Resource
     private ProjectUtil projectUtil;
     @Resource
+    private CloseableHttpClient closeableHttpClient;
+    @Resource
+    private RequestConfig requestConfig;
+    @Resource
+    private IndexTemplateMapper indexTemplateMapper;
+    @Resource
+    private SceneMapper sceneMapper;
+    @Resource
     private DockerConfiguration dockerConfiguration;
     @Resource
     private GitConfiguration gitConfiguration;
@@ -93,25 +110,529 @@ public class ProjectService {
     @Resource
     private CustomRedisClient customRedisClient;
 
-
     // -------------------------------- Comment --------------------------------
 
-//    /**
-//     * @param projectMessageDTO 初始接收到的项目启动信息
-//     * @param projectWaitingKey projectWaitingKey
-//     * @param projectRunningKey projectRunningKey
-//     */
-//    @Transactional
-//    public void prepare(ProjectMessageDTO projectMessageDTO, String projectWaitingKey, String projectRunningKey) {
-//        String projectId = projectMessageDTO.getProjectId();
-//
-//        //2 将 redis 中该项目旧的信息则直接删除(包括 waitingKey)
-//        RedisUtil.deleteByPrefix(stringRedisTemplate, projectWaitingKey);
-//        RedisUtil.deleteByPrefix(stringRedisTemplate, projectRunningKey);
-//    }
+    /**
+     * 接收到运行信息立即复制一份数据作为据运行数
+     *
+     * @param projectMessageDTO 项目启动消息
+     */
+    public void createTaskAndFixData(ProjectMessageDTO projectMessageDTO) {
+        //* -------------------------------- 0 读取消息,创建临时目录 --------------------------------
+        String projectId = projectMessageDTO.getProjectId();                // 手动执行项目 id 或 自动执行子项目 id
+        String projectType = projectMessageDTO.getType();                   // 项目类型
+        String isChoiceGpu = projectUtil.getIsChoiceGpuByProjectId(projectId);
+        try {
+            String modelType = projectMessageDTO.getModelType();                // 模型类型,1 动力学模型 2 carsim模型
+            String packageId = projectMessageDTO.getScenePackageId();           // 场景测试包 id
+            String vehicleConfigId = projectMessageDTO.getVehicleConfigId();    // 模型配置 id
+            String algorithmId = projectMessageDTO.getAlgorithmId();            // 模型配置 id
+            long videoTime = projectMessageDTO.getMaxSimulationTime();          // 结果视频的时长
+            String userId = projectUtil.getUserIdByProjectIdAndProjectType(projectId, projectType);
+            String projectPath = linuxTempPath + "project/" + projectId + "/";
+            FileUtil.mkdir(projectPath);
+            //5 将该 project 下所有旧的指标得分删除。
+            taskMapper.deleteByProject(projectId);
+            indexMapper.deleteFirstTargetScoreByProjectId(projectId);
+            indexMapper.deleteLastTargetScoreByProjectId(projectId);
+            // -------------------------------- 1 查询场景 --------------------------------
+            log.info("项目 " + projectId + " 开始查询场景。");
+            //根据场景测试包 packageId,拿到场景集合(包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
+            List<SceneEntity> sceneEntityList = getSceneList(projectId, packageId);
+            int taskTotal = sceneEntityList.size();
+            projectMessageDTO.setTaskTotal(taskTotal);
+            projectMessageDTO.setTaskCompleted(0);
+            //去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
+            Set<SceneEntity> sceneEntitySet = new HashSet<>(sceneEntityList);
+            log.info("项目 " + projectId + " 场景包括:" + sceneEntitySet);
+            // -------------------------------- 2 算法导入 --------------------------------
+            log.info("项目 " + projectId + " 开始算法导入。");
+            String algorithmDockerImage = handleAlgorithm(projectId, algorithmId);
+            log.info("项目 " + projectId + " 算法已导入:" + algorithmDockerImage);
+            // -------------------------------- 3 查询模型 --------------------------------
+            if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
+                log.debug("项目 " + projectId + " 开始查询模型。");
+                //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
+                VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
+                List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
+                List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
+                // -------------------------------- 4 保存任务消息 --------------------------------
+                log.debug("项目 " + projectId + " 开始保存任务消息。");
+                List<TaskEntity> taskList = new ArrayList<>();
+                for (SceneEntity sceneEntity : sceneEntitySet) {
+                    String sceneId = sceneEntity.getId();
+                    //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
+                    List<String> lastTargetIdList = null;
+                    if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
+                        lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
+                    } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
+                        lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
+                    }
+                    if (CollectionUtil.isEmpty(lastTargetIdList)) {
+                        throw new RuntimeException("项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
+                    }
+                    for (String lastTargetId : lastTargetIdList) {
+                        String taskId = StringUtil.getRandomUUID();
+                        // 保存任务信息
+                        TaskEntity taskEntity = TaskEntity.builder() // run_start_time 和 run_end_time 不填
+                                .id(taskId).pId(projectId).sceneId(sceneId).lastTargetId(lastTargetId).sceneName(sceneEntity.getName()).sceneType(sceneEntity.getType()).runState(DictConstants.TASK_PENDING).runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId).build();
+                        taskEntity.setCreateTime(TimeUtil.getNowForMysql());
+                        taskEntity.setCreateUserId(userId);
+                        taskEntity.setModifyTime(TimeUtil.getNowForMysql());
+                        taskEntity.setModifyUserId(userId);
+                        taskEntity.setModifyTime(TimeUtil.getNowForMysql());
+                        taskEntity.setIsDeleted("0");
+                        taskList.add(taskEntity);
+                        // 将 xosc、xodr、osgb 全部上传到仿真结果路径
+                        String scenarioOsc = sceneEntity.getScenarioOsc();
+                        String[] splitXosc = scenarioOsc.split("/");
+                        String xoscName = splitXosc[splitXosc.length - 1];
+                        String[] xoscNameSplit = xoscName.split("\\.");
+                        String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
+                        String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
+                        String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
+                        MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
+                        MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
+                        FileUtil.rm(xoscPathOfLinux);   // 删除临时文件
+
+                        String scenarioOdr = sceneEntity.getScenarioOdr();
+                        String[] splitXodr = scenarioOdr.split("/");
+                        String xodrName = splitXodr[splitXodr.length - 1];
+                        String[] xodrNameSplit = xodrName.split("\\.");
+                        String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
+                        String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
+                        String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
+                        MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
+                        MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
+                        FileUtil.rm(xodrPathOfLinux);   // 删除临时文件
+
+                        String scenarioOsgb = sceneEntity.getScenarioOsgb();
+                        String[] splitOsgb = scenarioOsgb.split("/");
+                        String osgbName = splitOsgb[splitOsgb.length - 1];
+                        String[] osgbNameSplit = osgbName.split("\\.");
+                        String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
+                        String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
+                        String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
+                        MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
+                        MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
+                        FileUtil.rm(osgbPathOfLinux);   // 删除临时文件
+
+                        // 组装 task 消息
+                        TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleModel.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(DynamicsModel.builder().dynamics_maxspeed(vehicleEntity.getMaxSpeed()).dynamics_enginepower(vehicleEntity.getEnginePower()).dynamics_maxdecel(vehicleEntity.getMaxDeceleration()).dynamics_maxsteering(vehicleEntity.getMaxSteeringAngle()).dynamics_mass(vehicleEntity.getMass()).dynamics_frontsurfaceeffective(vehicleEntity.getFrontSurfaceEffective()).dynamics_airdragcoefficient(vehicleEntity.getAirDragCoefficient()).dynamics_rollingresistance(vehicleEntity.getRollingResistanceCoefficient()).dynamics_wheeldiameter(vehicleEntity.getWheelDiameter()).dynamics_wheeldrive(vehicleEntity.getWheelDrive()).dynamics_overallefficiency(vehicleEntity.getOverallEfficiency()).dynamics_distfront(vehicleEntity.getFrontDistance()).dynamics_distrear(vehicleEntity.getRearDistance()).dynamics_distleft(vehicleEntity.getLeftDistance()).dynamics_distright(vehicleEntity.getRightDistance()).dynamics_distheight(vehicleEntity.getHeightDistance()).dynamics_wheelbase(vehicleEntity.getWheelbase()).build()).sensors(SensorsEntity.builder()   // 根据 vehicleId 查询绑定的传感器列表
+                                .camera(cameraEntityList).OGT(ogtEntityList).build()).build()).build();
+                        FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
+                        log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
+                    }
+                }
+                taskUtil.batchInsertTask(taskList);
+                log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
+            } else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
+                log.debug("项目 " + projectId + " 开始查询模型。");
+                VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
+                List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
+                List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
+                // -------------------------------- 4 保存任务消息 --------------------------------
+                log.debug("项目 " + projectId + " 开始保存任务消息。");
+                List<TaskEntity> taskList = new ArrayList<>();
+                for (SceneEntity sceneEntity : sceneEntitySet) {
+                    String sceneId = sceneEntity.getId();
+                    //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
+                    List<String> lastTargetIdList = null;
+                    if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
+                        lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
+                    } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
+                        lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
+                    }
+                    if (CollectionUtil.isEmpty(lastTargetIdList)) {
+                        throw new RuntimeException("项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
+                    }
+                    for (String lastTargetId : lastTargetIdList) {
+                        String taskId = StringUtil.getRandomUUID();
+                        // 保存任务信息
+                        TaskEntity taskEntity = TaskEntity.builder() // run_start_time 和 run_end_time 不填
+                                .id(taskId).pId(projectId).sceneId(sceneId).lastTargetId(lastTargetId).sceneName(sceneEntity.getName()).sceneType(sceneEntity.getType()).runState(DictConstants.TASK_PENDING).runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId).build();
+                        taskEntity.setCreateTime(TimeUtil.getNowForMysql());
+                        taskEntity.setCreateUserId(userId);
+                        taskEntity.setModifyTime(TimeUtil.getNowForMysql());
+                        taskEntity.setModifyUserId(userId);
+                        taskEntity.setModifyTime(TimeUtil.getNowForMysql());
+                        taskEntity.setIsDeleted("0");
+                        taskList.add(taskEntity);
+                        // 将 xosc、xodr、osgb 全部上传到仿真结果路径
+                        String scenarioOsc = sceneEntity.getScenarioOsc();
+                        String[] splitXosc = scenarioOsc.split("/");
+                        String xoscName = splitXosc[splitXosc.length - 1];
+                        String[] xoscNameSplit = xoscName.split("\\.");
+                        String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
+                        String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
+                        String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
+                        MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
+                        MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
+
+                        String scenarioOdr = sceneEntity.getScenarioOdr();
+                        String[] splitXodr = scenarioOdr.split("/");
+                        String xodrName = splitXodr[splitXodr.length - 1];
+                        String[] xodrNameSplit = xodrName.split("\\.");
+                        String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
+                        String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
+                        String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
+                        MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
+                        MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
+
+                        String scenarioOsgb = sceneEntity.getScenarioOsgb();
+                        String[] splitOsgb = scenarioOsgb.split("/");
+                        String osgbName = splitOsgb[splitOsgb.length - 1];
+                        String[] osgbNameSplit = osgbName.split("\\.");
+                        String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
+                        String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
+                        String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
+                        MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
+                        MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
+                        log.info("已经将 xosc、xodr、osgb 上传到 minio 的结果文件目录:" + projectResultPathOfMinio);
+
+                        // 组装 task 消息
+                        // carsim 不需要查询模型参数
+                        TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleModel.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(null).sensors(SensorsEntity.builder()   // 根据 vehicleId 查询绑定的传感器列表
+                                .camera(cameraEntityList).OGT(ogtEntityList).build()).build()).build();
+
+                        FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
+                        log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
+                    }
+                }
+                taskUtil.batchInsertTask(taskList);
+                log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
+            }
+
+            //* -------------------------------- 4 开始排队 --------------------------------
+            cacheProject(projectMessageDTO);
+        } catch (Exception e) {
+            log.error("项目报错。", e);
+            stopProject(isChoiceGpu, projectId, projectType, e.getMessage());
+            throw new RuntimeException(e);
+        }
+
+    }
+
+
+    /**
+     * 任务运行前首先判断用户是否拥有可分配资源
+     *
+     * @param projectMessageDTO 项目启动消息
+     */
+    @SneakyThrows
+    public void cacheProject(ProjectMessageDTO projectMessageDTO) {
+        log.debug("判断用户是否拥有可分配资源:" + projectMessageDTO);
+        //1 读取 kafka 的 project 信息
+        final String modelType = projectMessageDTO.getModelType();
+        String projectId = projectMessageDTO.getProjectId();    // 手动执行项目 id 或 自动执行子项目 id
+        long parallelism = projectMessageDTO.getParallelism();   // 项目并行度
+        String projectType = projectMessageDTO.getType(); // 项目类型
+        //2 获取用户信息(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
+        final UserEntity userEntity = projectUtil.getUserEntityByProjectIdAndProjectType(projectId, projectType);
+        String projectUserId = userEntity.getId();
+        log.debug("项目 " + projectId + " 的创建人为:" + userEntity);
+        String roleCode = userEntity.getRoleCode();
+        String useType = userEntity.getUseType();
+        ClusterEntity clusterEntity;
+        String clusterUserId;  // 项目实际运行使用的用户集群
+        if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
+            clusterUserId = DictConstants.SYSTEM_CLUSTER_ID;
+            log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
+            PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
+            run(projectMessageDTO, clusterUserId, modelType, DictConstants.SYSTEM_USER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
+            return;
+        } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
+            clusterUserId = projectUserId;
+            clusterEntity = clusterMapper.selectByUserId(clusterUserId);
+            log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterEntity);
+        } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
+            if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) {   //3-3 普通子账户,根据自己的独占节点排队
+                clusterUserId = projectUserId;
+                clusterEntity = clusterMapper.selectByUserId(clusterUserId);
+                log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通独占子账户(自己的集群),集群为:" + clusterEntity);
+            } else if (DictConstants.USER_TYPE_PUBLIC.equals(useType)) {    //3-4 共享子账户,根据父账户的共享节点排队
+                clusterUserId = userEntity.getCreateUserId();
+                clusterEntity = clusterMapper.selectByUserId(clusterUserId);
+                log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通共享子账户(父账户的集群),集群为:" + clusterEntity);
+            } else {
+                throw new RuntimeException("用户" + projectUserId + "未知占用类型:" + useType);
+            }
+        } else {
+            throw new RuntimeException("未知角色类型:" + roleCode);
+        }
+        // 获取仿真软件证书数量和动力学软件证书数量(vtd占一个仿真证书,carsim各占一个)
+        PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterEntity.getId(), projectId);
+        final Integer usingSimulationLicenseNumber = projectUtil.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_SIMULATION);
+        final Integer usingDynamicLicenseNumber;
+        final Integer numSimulationLicense = clusterEntity.getNumSimulationLicense();
+        final Integer numDynamicLicense = clusterEntity.getNumDynamicLicense();
+        final String clusterId = clusterEntity.getId();
+        //1 判断仿真证书是否够用,如果
+        // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
+        if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
+            if (usingSimulationLicenseNumber + parallelism <= numSimulationLicense) {
+                run(projectMessageDTO, clusterUserId, modelType, clusterEntity.getId(), redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
+            } else {
+                log.info("VTD 项目 {} 并行度为 {},用户 {} 的集群 {} 的仿真证书总数量为 {},已使用数量为 {}。该项目加入等待队列。 ", projectId, parallelism, clusterUserId, clusterId, numSimulationLicense, usingSimulationLicenseNumber);
+                wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
+            }
+        } else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
+            usingDynamicLicenseNumber = projectUtil.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_DYNAMIC);
+            if (usingSimulationLicenseNumber + parallelism <= numSimulationLicense && usingDynamicLicenseNumber + parallelism <= numDynamicLicense) {
+                run(projectMessageDTO, clusterUserId, modelType, clusterEntity.getId(), redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
+            } else {
+                log.info("CARSIM 项目 {} 并行度为 {},用户 {} 的集群 {} 的仿真证书总数量为 {},已使用数量为 {},动力学证书总数量为 {},已使用数量为 {}。该项目加入等待队列。 ", projectId, parallelism, clusterUserId, clusterId, numSimulationLicense, usingSimulationLicenseNumber, numDynamicLicense, usingDynamicLicenseNumber);
+                wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
+            }
+        } else {
+            throw new RuntimeException("未知模型类型:" + modelType);
+        }
+    }
+
+    //* -------------------------------- 运行 --------------------------------
+
+    /**
+     * @param projectMessageDTO 初始接收到的项目启动信息
+     * @param clusterId         集群ID
+     * @param projectRunningKey projectRunningKey
+     * @param projectWaitingKey projectWaitingKey
+     */
+    public void run(ProjectMessageDTO projectMessageDTO, String clusterUserId, String modelType, String clusterId, String projectRunningKey, String projectWaitingKey) {
+        String projectId = projectMessageDTO.getProjectId();    // 项目 id
+        String isChoiceGpu = projectUtil.getIsChoiceGpuByProjectId(projectId);
+        int parallelism = projectMessageDTO.getParallelism();  // 期望并行度
+        //1 获取集群剩余可用并行度
+        int restParallelism = projectUtil.getRestParallelism(isChoiceGpu);
+        //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
+        if (restParallelism > 0L) {
+            log.info("集群 " + clusterId + " 执行项目 " + projectId);
+            if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)) {
+                projectUtil.useLicense(clusterUserId, modelType, parallelism);
+            }
+            // 设置实际的并行度
+            projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism));   // 设置实际的并行度
+            parseProject(projectMessageDTO, projectRunningKey, isChoiceGpu);
+        } else {
+            log.info("服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
+            wait(projectWaitingKey, projectMessageDTO);
+        }
+    }
+
+    /**
+     * @param projectMessageDTO 初始接收到的项目启动信息
+     * @param projectRunningKey projectRunningKey
+     */
+    @SneakyThrows
+    public void parseProject(ProjectMessageDTO projectMessageDTO, String projectRunningKey, String isChoiceGpu) {
+        String projectId = projectMessageDTO.getProjectId();    // 项目 id
+        String modelType = projectMessageDTO.getModelType();
+        String vehicleConfigId = projectMessageDTO.getVehicleConfigId();
+        int currentParallelism = projectMessageDTO.getCurrentParallelism();   // 当前并行度
+        String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
+        String projectPath = linuxTempPath + "project/" + projectId + "/";
+        // -------------------------------- 1 获取任务 json 列表 --------------------------------
+        List<String> taskJsonList = FileUtil.listAbsolutePathByTypeAndLength(projectPath, "json", (StringUtil.getRandomUUID() + ".json").length());
+        int taskTotal = taskJsonList.size();
+        projectMessageDTO.setTaskTotal(taskTotal);
+        projectMessageDTO.setTaskCompleted(0);
+        // 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
+        //1 获取剩余并行度和即将使用的各node的并行度
+        Map<String, Integer> nodeMap0 = projectUtil.getNodeMap(isChoiceGpu);
+        Map<String, Integer> nodeMap = projectUtil.getNodeMapToUse(isChoiceGpu, Math.min(currentParallelism, taskTotal));
+        //2 将指定 node 的并行度减少
+        nodeMap.keySet().forEach(nodeName -> projectUtil.decrementParallelism(isChoiceGpu, nodeName, nodeMap.get(nodeName)));
+        // 重新设置实际使用的并行度并保存到 redis
+        int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
+        projectMessageDTO.setCurrentParallelism(realCurrentParallelism);
+        log.info("项目 " + projectId + " 运行在:" + nodeMap);
+        stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
+        //* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
+        String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
+        // -------------------------------- 4 发送任务消息 --------------------------------
+        List<NodeEntity> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
+        int messageNumber = 0;
+        ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1);   // 创建主题
+        TimeUnit.SECONDS.sleep(7);
+        // 需要即时启动的任务(并行度的大小)
+        CopyOnWriteArrayList<String> yamlToRunRedisKeyList = new CopyOnWriteArrayList<>();
+        for (String taskJsonPath : taskJsonList) {
+            String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
+            // 保存运行中的任务信息
+            String taskMessageKey = projectRunningKey + ":task:" + taskId + ":message";
+            String taskJson = FileUtil.read(taskJsonPath);
+            stringRedisTemplate.opsForValue().set(taskMessageKey, taskJson);
+
+            //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
+            SendResult<String, String> stringStringSendResult = kafkaTemplate.send(projectId, messageNumber % currentParallelism, taskId, taskJson).get();
+            RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
+            String topic = recordMetadata.topic();  // 消息发送到的topic
+            int partition = recordMetadata.partition(); // 消息发送到的分区
+            long offset = recordMetadata.offset();  // 消息在分区内的offset
+            log.info("发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
+            //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
+            // 选一个 count 最少的 node
+            String currentNodeName = "";
+            NodeEntity currentNodeEntity = null;
+            int currentCount = Integer.MAX_VALUE;
+            log.info("各节点已经预定的任务个数为:" + nodeListToCount);
+            for (NodeEntity nodeEntity : nodeListToCount) {
+                int tempCount = nodeEntity.getCount();
+                String tempNodeName = nodeEntity.getNodeName();
+                if (tempCount < currentCount) {
+                    currentCount = tempCount;
+                    currentNodeName = tempNodeName;
+                    currentNodeEntity = nodeEntity;
+                }
+            }
+            if (currentNodeEntity == null) {
+                String errorMessage = "挑选节点失败。";
+                log.info(errorMessage);
+                throw new RuntimeException(errorMessage);
+            }
+            currentNodeEntity.setCount(currentNodeEntity.getCount() + 1);
+            Integer cpuOrder = null;
+            if (currentCount == 0) {
+                // 根据各节点剩余并行度,倒序获取 cpu 编号
+                cpuOrder = nodeMap0.get(currentNodeName) - 1;
+                nodeMap0.put(currentNodeName, cpuOrder);
+            }
+            // 只有准备启动(即 currentCount == 0)的时候才指定 cpu 编号
+            log.info("创建任务 " + taskId + " 的 yaml:是否使用 gpu (0是1否)" + isChoiceGpu + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前节点名称为:" + currentNodeName + ",当前 cpu 编号为:" + cpuOrder);
+            String yamlRedisKey = createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder);
+
+            if (currentCount == 0) {
+                String podName = yamlRedisKey.split(":")[yamlRedisKey.split(":").length - 1];
+                log.info("将 pod 加入到启动列表 " + podName);
+                yamlToRunRedisKeyList.add(yamlRedisKey);
+            }
+            messageNumber++;
+        }
+        TimeUnit.SECONDS.sleep(6);
+        log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
+        for (String redisKey : yamlToRunRedisKeyList) {
+            projectUtil.createPodBegin(projectId, redisKey);
+        }
+        log.info("项目 " + projectId + " 已经启动 " + yamlToRunRedisKeyList);
+    }
+
+
+    //* -------------------------------- 等待 --------------------------------
+
+    /**
+     * @param projectWaitingKey 项目等待 key
+     * @param projectMessageDTO 项目信息
+     */
+    public void wait(String projectWaitingKey, ProjectMessageDTO projectMessageDTO) {
+        stringRedisTemplate.opsForValue().set(projectWaitingKey, JsonUtil.beanToJson(projectMessageDTO));
+    }
+
+
+    //* -------------------------------- Comment --------------------------------
+
+    /**
+     * 创建一个临时 yaml,node 在最后用 # 号隔开
+     */
+    @SneakyThrows
+    public String createTempYaml(String projectId, String vehicleConfigId, String modelType, String algorithmDockerImage, String nodeName, int kafkaPartition, long kafkaOffset, String isChoiceGpu, Integer cpuOrder) {
+        String podName = projectUtil.getRandomPodName(projectId);   // 生成 podName
+        String podYaml = projectUtil.getPodYamlName(nodeName, podName);     // 模板文件名称
+        String yamlPath = podYamlDirectory + podYaml;
+        String finalYaml;
+
+        if ("1".equals(modelType)) {
+            String podString = FileUtil.read(new File(vtdPodTemplateYaml));
+            String replace0 = podString.replace("vtd-container", "vtd-" + projectId);
+            String replace1 = replace0.replace("simulation-cloud-ip", simulationCloudIp);
+            String replace2 = replace1.replace("kafka-ip", kafkaIp);
+            String replace3 = replace2.replace("kafka-topic", projectId);     // 消息主题名称为 projectId
+            String replace4 = replace3.replace("kafka-partition", "\"" + kafkaPartition + "\"");
+            String replace5 = replace4.replace("kafka-offset", "\"" + kafkaOffset + "\"");
+            String replace6 = replace5.replace("minio-ip", minioConfiguration.getEndpointWithoutHttp());
+            String replace7 = replace6.replace("minio-access-key", minioConfiguration.getAccessKey());
+            String replace8 = replace7.replace("minio-secret-key", minioConfiguration.getSecretKey());
+
+            String replace9 = replace8.replace("algorithm-container", "algorithm-" + projectId);
+            String replace10 = replace9.replace("algorithm-image", algorithmDockerImage);
+
+            String replace11 = replace10.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
+            String replace12 = replace11.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
+            String replace13 = replace12.replace("node-name", nodeName);     // 指定 pod 运行节点
+
+            String replace14;
+            if (cpuOrder != null) {
+                replace14 = replace13.replace("cpu-order", "\"" + cpuOrder + "\"");     // 指定 cpu 编号
+                stringRedisTemplate.opsForValue().set("project:" + projectId + ":pod:" + podName + ":cpu", String.valueOf(cpuOrder));    //  pod 运行使用的 cpu编号
+            } else {
+                replace14 = replace13;
+            }
+
+            if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
+                String replace15 = replace14.replace("vtd-image", kubernetesConfiguration.getImageVtdGpu());
+                finalYaml = replace15.replace("vtd-command", kubernetesConfiguration.getCommandVtdGpu());
+            } else if (DictConstants.NOT_USE_GPU.equals(isChoiceGpu)) {
+                String replace15 = replace14.replace("vtd-image", kubernetesConfiguration.getImageVtdNogpu());
+                finalYaml = replace15.replace("vtd-command", kubernetesConfiguration.getCommandVtdNogpu());
+            } else {
+                throw new RuntimeException("createTempYaml() 是否使用 gpu:" + isChoiceGpu);
+            }
+        } else if ("2".equals(modelType)) {
+            String podString = FileUtil.read(new File(carsimPodTemplateYaml));
+            String replace0 = podString.replace("vtd-container", "vtd-" + projectId);
+            String replace1 = replace0.replace("simulation-cloud-ip", simulationCloudIp);
+            String replace2 = replace1.replace("kafka-ip", kafkaIp);
+            String replace3 = replace2.replace("kafka-topic", projectId);     // 消息主题名称为 projectId
+            String replace4 = replace3.replace("kafka-partition", "\"" + kafkaPartition + "\"");
+            String replace5 = replace4.replace("kafka-offset", "\"" + kafkaOffset + "\"");
+            String replace6 = replace5.replaceAll("minio-ip", minioConfiguration.getEndpointWithoutHttp());
+            String replace7 = replace6.replaceAll("minio-access-key", minioConfiguration.getAccessKey());
+            String replace8 = replace7.replaceAll("minio-secret-key", minioConfiguration.getSecretKey());
+
+            String replace9 = replace8.replace("algorithm-container", "algorithm-" + projectId);
+            String replace10 = replace9.replace("algorithm-image", algorithmDockerImage);
+
+            String replace11 = replace10.replace("carsim-container", "carsim-" + projectId);
+            String replace12 = replace11.replace("carsim-image", kubernetesConfiguration.getCarsimImage());
+            String replace13 = replace12.replace("carsim-command", kubernetesConfiguration.getCarsimCommand());
+            String replace14 = replace13.replace("minio-bucket", minioConfiguration.getBucketName());
+            String replace15 = replace14.replace("par-path", vehicleMapper.selectParPathByVehicleConfigId(vehicleConfigId));
+
+            String replace16 = replace15.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
+            String replace17 = replace16.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
+            String replace18 = replace17.replace("node-name", nodeName);     // 指定 pod 运行节点
+
+            String replace19;
+            if (cpuOrder != null) {
+                replace19 = replace18.replace("cpu-order", "\"" + cpuOrder + "\"");     // 指定 cpu 编号
+                stringRedisTemplate.opsForValue().set("project:" + projectId + ":pod:" + podName + ":cpu", String.valueOf(cpuOrder));    //  pod 运行使用的 cpu编号
+            } else {
+                replace19 = replace18;
+            }
+
+            if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
+                log.info("createTempYaml() k8s参数为:" + kubernetesConfiguration);
+                log.info("createTempYaml() yaml模板为:" + replace12);
+                String replace20 = replace19.replace("vtd-image", kubernetesConfiguration.getImageVtdGpu());
+                finalYaml = replace20.replace("vtd-command", kubernetesConfiguration.getCommandVtdCarsimGpu());
+            } else if (DictConstants.NOT_USE_GPU.equals(isChoiceGpu)) {
+                String replace20 = replace19.replace("vtd-image", kubernetesConfiguration.getImageVtdNogpu());
+                finalYaml = replace20.replace("vtd-command", kubernetesConfiguration.getCommandVtdCarsimNogpu());
+            } else {
+                throw new RuntimeException("createTempYaml() 是否使用 gpu:" + isChoiceGpu);
+            }
+        } else {
+            throw new RuntimeException("createTempYaml() 模型类型错误:" + modelType);
+        }
+        log.info("保存项目 " + projectId + " 的 yaml 文件:" + yamlPath);
+        FileUtil.writeStringToLocalFile(finalYaml, yamlPath);
+        String yamlRedisKey = "project:" + projectId + ":node:" + nodeName + ":yaml:" + podName;
+        stringRedisTemplate.opsForValue().set(yamlRedisKey, yamlPath);
+        return yamlRedisKey;
+    }
+
+    //* -------------------------------- 逻辑 --------------------------------
+
 
     @SneakyThrows
-    @Transactional
     public List<SceneEntity> getSceneList(String projectId, String packageId) {
 
         String allIndexPrefix = "project:" + projectId + ":package:" + packageId + ":all";
@@ -155,48 +676,12 @@ public class ProjectService {
 
     /**
      * 将 master 节点设置成镜像仓库,导入镜像的同时 commit 到仓库当中,供其他节点 pull
-     * {
-     * "data" : {
-     * "totalElements" : 19,
-     * "totalPage" : null,
-     * "currentPage" : 0,
-     * "size" : 0,
-     * "content" : [ {
-     * "id" : "ICV20220418170905",
-     * "algorithmId" : "2022041813435600064848bbc0e0efb441978d57",
-     * "algorithmName" : "SUAN FA2"
-     * "description" : "",
-     * "algorithmType" : "2"
-     * }, {
-     * "id" : "ICV20220408165152",
-     * "algorithmId" : "202204081650140005916540f84c37764591a5c4",
-     * "algorithmName" : "ceshi 33",
-     * "description" : "",
-     * "algorithmType" : "2"
-     * } ],
-     * "hasPreviousPage" : false,
-     * "hasNextPage" : false
-     * },
-     * "success" : true,
-     * "message" : "ok",
-     * "code" : 1,
-     * "nowTime" : "2022-04-22 10:14:40"
-     * }
-     * <p>
-     * {
-     * "data": "https://sysware-icv-algo-pri.obs.cn-north-4.myhuaweicloud.com:443/1658730835935/jybdc_0712_01.tar?AccessKeyId=0ZFICI4CB04DFHQTPDBF&Expires=1658917777&Signature=bPUT%2FnFwlHcAqOFsskYVPKOdnYs%3D",
-     * "success": true,
-     * "message": "ok",
-     * "code": 1,
-     * "nowTime": "2022-07-27 18:14:37"
-     * }
      *
      * @param projectId   项目 id
      * @param algorithmId 算法 id
      * @return 镜像名称
      */
     @SneakyThrows
-    @Transactional
     public String handleAlgorithm(String projectId, String algorithmId) {
         //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
         AlgorithmEntity algorithmEntity = algorithmMapper.selectById(algorithmId);
@@ -251,11 +736,11 @@ public class ProjectService {
             //0 查看算法是否已经导入
             if (!projectUtil.isImported(dockerImageWithoutVersion)) {
                 //1 获取 token
-                String tokenUrl = algorithmPlatformTokenUri + "?grant_type=client_credential&appid=" + algorithmPlatformAppid + "&secret=" + algorithmPlatformSecret;
+                String tokenUrl = customConfiguration.getAlgorithmPlatformTokenUri() + "?grant_type=client_credential&appid=" + customConfiguration.getAlgorithmPlatformAppid() + "&secret=" + customConfiguration.getAlgorithmPlatformSecret();
                 String tokenJson = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
                 String token = new ObjectMapper().readTree(tokenJson).path("data").path("access_token").asText();
                 //2 获取 下载地址
-                String downloadUrl = algorithmPlatformAlgorithmAddrUri + "?access_token=" + token + "&id=" + algorithmId;
+                String downloadUrl = customConfiguration.getAlgorithmPlatformAlgorithmAddrUri() + "?access_token=" + token + "&id=" + algorithmId;
                 //3 下载算法包
                 String downloadUrlJson = HttpUtil.get(closeableHttpClient, requestConfig, downloadUrl);
                 String tempDownloadUrl = new ObjectMapper().readTree(downloadUrlJson).path("data").asText();
@@ -270,10 +755,6 @@ public class ProjectService {
                 log.info("handleAlgorithm 算法镜像" + dockerImageWithoutVersion + "已导入。");
             }
         }
-
-//        LinuxUtil.execute("docker tag " + algorithmTarLinuxTempPath + " " + dockerImage);   // 标记镜像名称
-//        LinuxUtil.execute("docker login " + algorithmTarLinuxTempPath + " " + dockerImage); // 登录 harbor
-//        LinuxUtil.execute("docker push " + algorithmTarLinuxTempPath + " " + dockerImage);  // 推送镜像到仓库
         log.info("项目 " + projectId + " 使用的算法镜像为:" + dockerImage);
         return dockerImage;
     }
@@ -310,7 +791,7 @@ public class ProjectService {
 
 
         //2 删除 kafka 消息
-//        ApacheKafkaUtil.deleteTopic(kafkaAdminClient, projectId);
+        ApacheKafkaUtil.deleteTopic(kafkaAdminClient, projectId);
         //3 删除项目所有任务
         taskMapper.deleteByProject(projectId);
 

+ 122 - 7
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -1,29 +1,54 @@
 package com.css.simulation.resource.scheduler.service;
 
+import api.common.pojo.constants.DictConstants;
+import api.common.util.HttpUtil;
+import api.common.util.TimeUtil;
+import com.css.simulation.resource.scheduler.configuration.custom.CustomConfiguration;
+import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
 import com.css.simulation.resource.scheduler.configuration.redis.CustomRedisClient;
 import com.css.simulation.resource.scheduler.data.entity.PrefixEntity;
 import com.css.simulation.resource.scheduler.data.entity.ProjectEntity;
 import com.css.simulation.resource.scheduler.data.entity.TaskEntity;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-import com.css.simulation.resource.scheduler.util.ProjectUtil;
-import com.css.simulation.resource.scheduler.util.TaskUtil;
+import com.css.simulation.resource.scheduler.util.*;
+import io.kubernetes.client.openapi.ApiClient;
+import io.minio.MinioClient;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+import java.util.concurrent.TimeUnit;
 
 @Service
 @Slf4j
 public class TaskService {
+    @Value("${minio.bucket-name}")
+    private String bucketName;
+    @Value("${scheduler.minio-path.project-result}")
+    private String resultPathMinio;
+    @Resource
+    private StringRedisTemplate stringRedisTemplate;
     @Resource
     private TaskMapper taskMapper;
     @Resource
+    private MinioClient minioClient;
+    @Resource
     private ProjectUtil projectUtil;
     @Resource
-    private TaskUtil taskUtil;
+    private KubernetesConfiguration kubernetesConfiguration;
+    @Resource
+    private ApiClient apiClient;
     @Resource
     private CustomRedisClient customRedisClient;
+    @Resource
+    private CustomConfiguration customConfiguration;
+    @Resource
+    private TaskUtil taskUtil;
+    @Resource
+    private ProjectService projectService;
 
 
     // -------------------------------- Comment --------------------------------
@@ -40,10 +65,101 @@ public class TaskService {
             ProjectEntity projectEntity = projectUtil.getProjectByProjectId(projectId);
             String projectType = projectEntity.getProjectType();  // 项目类型
             String maxSimulationTime = projectEntity.getMaxSimulationTime();  // 项目类型
-            String userId = taskEntity.getCreateUserId();   // 用户 id
-            PrefixEntity redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaskId(userId, projectId, taskId);
+            String projectUserId = taskEntity.getCreateUserId();   // 用户 id
+            PrefixEntity redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaskId(projectUserId, projectId, taskId);
             // 判断是否完成
-            taskUtil.isProjectCompleted(redisPrefix, userId, projectId, projectType, maxSimulationTime, taskId, state, podName);
+            boolean isCompleted;
+            String nodeName = projectUtil.getNodeNameOfPod(projectId, podName);
+            final String isChoiceGpu = projectUtil.getIsChoiceGpuByProjectId(projectId);
+            if (DictConstants.TASK_RUNNING.equals(state)) {  // 运行中的 pod 无需删除
+                // 将运行中的任务的 pod 名称放入 redis
+                stringRedisTemplate.opsForValue().set(redisPrefix.getTaskPodKey(), podName);
+                log.info("修改任务 " + taskId + " 的状态为 " + state + ",pod 名称为:" + podName);
+                taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
+                return;
+            } else { // 结束的 pod 都直接删除,并判断项目是否完成
+                // -------------------------------- 处理状态 --------------------------------
+                log.info("修改任务 {} 的状态为 {} ,pod 名称为 {} ,并删除 pod。", taskId, state, podName);
+                if (DictConstants.TASK_ABORTED.equals(state)) {
+                    String minioPathOfErrorLog = resultPathMinio + projectId + "/" + taskId + "error.log";
+                    boolean objectExist = MinioUtil.isObjectExist(minioClient, bucketName, minioPathOfErrorLog);
+                    String targetEvaluate;
+                    if (objectExist) {
+                        String errorString = MinioUtil.downloadToString(minioClient, bucketName, minioPathOfErrorLog);
+                        String[] lines = errorString.split("\n");
+                        StringBuilder errorMessage = new StringBuilder();
+                        for (String line : lines) {
+                            if (line.startsWith("Original Error")) {
+                                errorMessage.append(line).append("\n");
+                            }
+                            if (line.startsWith("Possible Cause")) {
+                                errorMessage.append(line);
+                                break;
+                            }
+                        }
+                        targetEvaluate = errorMessage.toString();
+                    } else {
+                        targetEvaluate = DictConstants.TASK_ERROR_REASON_2;
+                    }
+                    taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), targetEvaluate);
+                } else if (DictConstants.TASK_TERMINATED.equals(state)) {
+                    taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_3);
+                } else if (DictConstants.TASK_ANALYSIS.equals(state)) { // 该状态只会获得一次
+                    taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
+                    // 查询项目是否使用 CPU 生成视频
+                    if (DictConstants.VIDEO_CPU.equals(isChoiceGpu)) {
+                        String generateVideoKey = "task:" + taskId + ":generateVideo";
+                        customRedisClient.set(generateVideoKey, "0");
+                        HttpUtil.get(customConfiguration.getGenerateVideoUrl().replace("simulation-resource-video", nodeName) + "?generateVideoKey=" + generateVideoKey + "&nodeName=" + nodeName + "&projectId=" + projectId + "&projectType=" + projectType + "&maxSimulationTime=" + maxSimulationTime + "&taskId=" + taskId);
+//                    HttpUtil.get("http://" + nodeName + ":8007//simulation/resource/video/generate" + "?generateVideoKey=" + generateVideoKey + "&nodeName=" + nodeName + "&projectId" + projectId + "&projectType" + projectType + "&maxSimulationTime" + maxSimulationTime + "&taskId" + taskId);
+//                    videoFeignClient.generateVideo(generateVideoKey, nodeName, projectId, projectType, maxSimulationTime, taskId);
+                        log.info("任务 {} 使用 CPU 生成视频开始>>>>>>>", taskId);
+                        while (true) {
+                            TimeUnit.SECONDS.sleep(1);
+                            final String generateVideoValue = customRedisClient.get(generateVideoKey);
+                            if (DictConstants.YES.equals(generateVideoValue)) {
+                                customRedisClient.delete(generateVideoKey);
+                                break;
+                            }
+                        }
+                        log.info("任务 {} 使用 CPU 生成视频结束<<<<<<<", taskId);
+                    }
+                }
+                // -------------------------------- 判断项目是否结束 --------------------------------
+                isCompleted = projectUtil.complete(redisPrefix, projectId);
+                if (isCompleted) {
+                    //如果项目已完成先把 pod 删除,并归还并行度
+                    KubernetesUtil.deletePod2(apiClient, kubernetesConfiguration.getNamespace(), podName);
+                    projectUtil.incrementOneParallelism(isChoiceGpu, nodeName);
+                    projectUtil.releaseLicense(projectUtil.getClusterUserIdByProjectUserId(projectUserId), projectUtil.getModelTypeByProjectIdAndProjectType(projectId, projectType), 1);
+                } else {
+                    log.info("项目 " + projectId + " 还未运行完成。");
+                    projectUtil.createNextPod(projectUserId, projectId, projectType, nodeName, podName);
+                }
+                RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getTaskMessageKey());
+                RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getTaskPodKey());
+            }
+            //* -------------------------------- 打分 --------------------------------
+            String lock2 = "project:" + projectId + ":completed-lock";
+            try {
+                if (isCompleted) {
+                    customRedisClient.tryLock(lock2, 10 * 60L);
+                    log.info("项目 {} 开始打分。", projectId);
+                    taskUtil.score(redisPrefix.getProjectRunningKey(), projectUserId, projectId, projectType);
+                    log.info("项目 {} 计算评价等级。", projectId);
+                    taskUtil.evaluationLevel(projectId);
+                    log.info("项目 {} 开始释放资源。", projectId);
+                    taskUtil.done(redisPrefix, projectId, projectType);
+                    log.info("项目 {} 运行结束。", projectId);
+                }
+            } finally {
+                customRedisClient.unlock(lock2);
+            }
+        } catch (Exception e) {
+            log.error("项目报错。", e);
+            final ProjectEntity project = projectUtil.getProjectByProjectId(projectId);
+            projectService.stopProject(project.getIsChoiceGpu(), projectId, project.getProjectType(), e.getMessage());
+            throw new RuntimeException(e);
         } finally {
             customRedisClient.unlock(lock1);
         }
@@ -51,7 +167,6 @@ public class TaskService {
     }
 
 
-
     public Boolean confirm(String taskId) {
         return taskUtil.taskConfirm(taskId);
     }

+ 1 - 108
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/TaskUtil.java

@@ -3,14 +3,11 @@ package com.css.simulation.resource.scheduler.util;
 import api.common.pojo.constants.DictConstants;
 import api.common.util.*;
 import com.css.simulation.resource.scheduler.configuration.custom.CustomConfiguration;
-import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
-import com.css.simulation.resource.scheduler.configuration.redis.CustomRedisClient;
 import com.css.simulation.resource.scheduler.data.entity.*;
 import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.service.TaskIndexManager;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import io.kubernetes.client.openapi.ApiClient;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -42,8 +39,6 @@ public class TaskUtil {
     private String pyPath;
     @Value("${scheduler.linux-path.temp}")
     private String linuxTempPath;
-    @Value("${scheduler.minio-path.project-result}")
-    private String resultPathMinio;
     @Resource
     private StringRedisTemplate stringRedisTemplate;
     @Resource
@@ -64,15 +59,9 @@ public class TaskUtil {
     private ProjectUtil projectUtil;
     @Resource
     private SqlSessionFactory sqlSessionFactory;
-    @Resource
-    private KubernetesConfiguration kubernetesConfiguration;
-    @Resource
-    private ApiClient apiClient;
     @Resource(name = "myKafkaAdmin")
     private Admin admin;
     @Resource
-    private CustomRedisClient customRedisClient;
-    @Resource
     private CustomConfiguration customConfiguration;
     @Resource
     private CloseableHttpClient closeableHttpClient;
@@ -89,101 +78,6 @@ public class TaskUtil {
         }
     }
 
-    /**
-     * 加事务的话高并发情况下会死锁
-     */
-    @SneakyThrows
-    public void isProjectCompleted(PrefixEntity redisPrefix, String projectUserId, String projectId, String projectType, String maxSimulationTime, String taskId, String state, String podName) {
-        boolean isCompleted;
-        String nodeName = projectUtil.getNodeNameOfPod(projectId, podName);
-        final String isChoiceGpu = projectUtil.getIsChoiceGpuByProjectId(projectId);
-        if (DictConstants.TASK_RUNNING.equals(state)) {  // 运行中的 pod 无需删除
-            // 将运行中的任务的 pod 名称放入 redis
-            stringRedisTemplate.opsForValue().set(redisPrefix.getTaskPodKey(), podName);
-            taskTick(taskId); // 刷新一下心跳
-            log.info("修改任务 " + taskId + " 的状态为 " + state + ",pod 名称为:" + podName);
-            taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
-            return;
-        } else { // 结束的 pod 都直接删除,并判断项目是否完成
-            // -------------------------------- 处理状态 --------------------------------
-            log.info("修改任务 {} 的状态为 {} ,pod 名称为 {} ,并删除 pod。", taskId, state, podName);
-            if (DictConstants.TASK_ABORTED.equals(state)) {
-                String minioPathOfErrorLog = resultPathMinio + projectId + "/" + taskId + "error.log";
-                boolean objectExist = MinioUtil.isObjectExist(minioClient, bucketName, minioPathOfErrorLog);
-                String targetEvaluate;
-                if (objectExist) {
-                    String errorString = MinioUtil.downloadToString(minioClient, bucketName, minioPathOfErrorLog);
-                    String[] lines = errorString.split("\n");
-                    StringBuilder errorMessage = new StringBuilder();
-                    for (String line : lines) {
-                        if (line.startsWith("Original Error")) {
-                            errorMessage.append(line).append("\n");
-                        }
-                        if (line.startsWith("Possible Cause")) {
-                            errorMessage.append(line);
-                            break;
-                        }
-                    }
-                    targetEvaluate = errorMessage.toString();
-                } else {
-                    targetEvaluate = DictConstants.TASK_ERROR_REASON_2;
-                }
-                taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), targetEvaluate);
-            } else if (DictConstants.TASK_TERMINATED.equals(state)) {
-                taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_3);
-            } else if (DictConstants.TASK_ANALYSIS.equals(state)) { // 该状态只会获得一次
-                taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
-                // 查询项目是否使用 CPU 生成视频
-                if (DictConstants.VIDEO_CPU.equals(isChoiceGpu)) {
-                    String generateVideoKey = "task:" + taskId + ":generateVideo";
-                    customRedisClient.set(generateVideoKey, "0");
-                    HttpUtil.get(customConfiguration.getGenerateVideoUrl().replace("simulation-resource-video", nodeName) + "?generateVideoKey=" + generateVideoKey + "&nodeName=" + nodeName + "&projectId=" + projectId + "&projectType=" + projectType + "&maxSimulationTime=" + maxSimulationTime + "&taskId=" + taskId);
-//                    HttpUtil.get("http://" + nodeName + ":8007//simulation/resource/video/generate" + "?generateVideoKey=" + generateVideoKey + "&nodeName=" + nodeName + "&projectId" + projectId + "&projectType" + projectType + "&maxSimulationTime" + maxSimulationTime + "&taskId" + taskId);
-//                    videoFeignClient.generateVideo(generateVideoKey, nodeName, projectId, projectType, maxSimulationTime, taskId);
-                    log.info("任务 {} 使用 CPU 生成视频开始>>>>>>>", taskId);
-                    while (true) {
-                        TimeUnit.SECONDS.sleep(1);
-                        final String generateVideoValue = customRedisClient.get(generateVideoKey);
-                        if (DictConstants.YES.equals(generateVideoValue)) {
-                            customRedisClient.delete(generateVideoKey);
-                            break;
-                        }
-                    }
-                    log.info("任务 {} 使用 CPU 生成视频结束<<<<<<<", taskId);
-                }
-            }
-            // -------------------------------- 判断项目是否结束 --------------------------------
-            isCompleted = projectUtil.complete(redisPrefix, projectId);
-            if (isCompleted) {
-                //如果项目已完成先把 pod 删除,并归还并行度
-                KubernetesUtil.deletePod2(apiClient, kubernetesConfiguration.getNamespace(), podName);
-                projectUtil.incrementOneParallelism(isChoiceGpu, nodeName);
-                projectUtil.releaseLicense(projectUtil.getClusterUserIdByProjectUserId(projectUserId), projectUtil.getModelTypeByProjectIdAndProjectType(projectId, projectType), 1);
-            } else {
-                log.info("项目 " + projectId + " 还未运行完成。");
-                projectUtil.createNextPod(projectUserId, projectId, projectType, nodeName, podName);
-            }
-            RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getTaskMessageKey());
-            RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getTaskPodKey());
-        }
-        //* -------------------------------- 打分 --------------------------------
-        String lock2 = "project:" + projectId + ":completed-lock";
-        try {
-            if (isCompleted) {
-                customRedisClient.tryLock(lock2, 10 * 60L);
-                log.info("项目 {} 开始打分。", projectId);
-                score(redisPrefix.getProjectRunningKey(), projectUserId, projectId, projectType);
-                log.info("项目 {} 计算评价等级。", projectId);
-                evaluationLevel(projectId);
-                log.info("项目 {} 开始释放资源。", projectId);
-                done(redisPrefix, projectId, projectType);
-                log.info("项目 {} 运行结束。", projectId);
-            }
-        } finally {
-            customRedisClient.unlock(lock2);
-        }
-    }
-
 
     /**
      * @param projectUserId 项目创建用户的 id
@@ -423,7 +317,6 @@ public class TaskUtil {
         log.info("访问仿真云平台评价等级接口:" + customConfiguration.getEvaluationLevelUri() + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + post);
     }
 
-
     public Boolean taskConfirm(String taskId) {
         // 查询 task 如果不是 pending 则不执行
         String state = taskMapper.selectStateById(taskId);
@@ -453,7 +346,7 @@ public class TaskUtil {
             autoSubProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());
         }
         // 删除 kafka topic
-//        ApacheKafkaUtil.deleteTopic(admin, projectId);
+        ApacheKafkaUtil.deleteTopic(admin, projectId);
         // 删除 redis 中的 项目运行信息 键值对
         RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getProjectRunningKey());
         RedisUtil.deleteByPrefix(stringRedisTemplate, "project:" + projectId);