martin hace 3 años
padre
commit
94675cd060

+ 20 - 8
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -97,6 +97,7 @@ public class ManualProjectConsumer {
         int maxSimulationTime = projectMessageDTO.getMaxSimulationTime(); // 最大仿真时间,即生成视频的时间长度
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
+        int parallelism = projectMessageDTO.getParallelism();
         //2 根据 projectId 获取创建用户 id
         String userId = manualProjectMapper.selectCreateUserById(projectId);
         //3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
@@ -124,16 +125,27 @@ public class ManualProjectConsumer {
         // 获取拥有的节点数量,即仿真软件证书数量
         String clusterId = clusterPO.getId();
         int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
-        // 获取该集群中正在运行的项目数量
+        // 获取该集群中正在运行的项目
         Set<String> runningProjectSet = redisTemplate.keys(manualProjectTopic + ":cluster:" + clusterId + ":running" + "*");
-        int runningProjectNumber = CollectionUtil.isEmpty(runningProjectSet) ? 0 : runningProjectSet.size();
-        if (runningProjectNumber < simulationLicenseNumber) {
-            log.info("ManualProjectConsumer--cacheManualProject 集群 " + clusterId + " 将项目 " + projectId + " 执行!");
-            redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":running" + projectId, projectJson);
-            parseManualProject(projectJson);
+        // 根据项目 json 获取每个项目的并行度
+        if (CollectionUtil.isNotEmpty(runningProjectSet)) {
+            long parallelismSum = 0;
+            for (String projectKey : runningProjectSet) {
+                String projectJsonTemp = redisTemplate.opsForValue().get(projectKey);
+                ProjectMessageDTO projectMessageTemp = JsonUtil.jsonToBean(projectJsonTemp, ProjectMessageDTO.class);
+                parallelismSum += projectMessageTemp.getParallelism();
+            }
+            if (parallelismSum + parallelism <= simulationLicenseNumber) {
+                log.info("ManualProjectConsumer--cacheManualProject 集群 " + clusterId + " 将项目 " + projectId + " 执行!");
+                redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":running:" + projectId, projectJson);
+                parseManualProject(projectJson);
+            } else {
+                log.info("ManualProjectConsumer--cacheManualProject 集群 " + clusterId + " 将项目 " + projectId + " 放入等待队列!");
+                redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":waiting:" + projectId, projectJson);
+            }
         } else {
             log.info("ManualProjectConsumer--cacheManualProject 集群 " + clusterId + " 将项目 " + projectId + " 放入等待队列!");
-            redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":waiting" + projectId, projectJson);
+            redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":waiting:" + projectId, projectJson);
         }
     }
 
@@ -156,6 +168,7 @@ public class ManualProjectConsumer {
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
         String userId = manualProjectMapper.selectCreateUserById(projectId);
+        int parallelism = projectMessageDTO.getParallelism();    // 并行度
         //2 执行前准备,删除改项目下所有任务,即重新执行改项目时需要新的测试包
         manualProjectService.prepare(manualProjectTopic, userId, projectId, projectJson);
         // -------------------------------- 1 查询场景 --------------------------------
@@ -173,7 +186,6 @@ public class ManualProjectConsumer {
         String algorithmDockerImage = manualProjectService.handleAlgorithm(projectId, algorithmId);
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
         int completions = scenePOList.size();     // 结束标
-        int parallelism = projectMessageDTO.getParallelism();    // 并行度
         log.info("------- ManualProjectConsumer 项目 " + projectId + " 的完成度为:" + completions);
         log.info("------- ManualProjectConsumer 项目 " + projectId + " 的并行度为:" + parallelism);
         String jobTemplateYamlPathSource = jobTemplate + "job-template.yaml";

+ 17 - 10
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -183,6 +183,10 @@ public class TaskManager {
         ProjectPO projectPO = manualProjectMapper.selectById(projectId);
         String packageId = projectPO.getScenePackageId();  // 场景测试包 id,指标根 id
         List<TaskPO> taskList = taskMapper.selectTaskListByProjectId(projectId);  // 所有任务信息
+        if (CollectionUtil.isEmpty(taskList)) {
+            log.error("TaskManager--score 项目 " + projectId + " 下没有查询到任务!");
+            return;
+        }
         indexMapper.deleteFirstByProjectId(projectId);
         indexMapper.deleteLastByProjectId(projectId);
         //1 查询场景包对应指标
@@ -190,7 +194,7 @@ public class TaskManager {
         List<IndexTemplatePO> allIndexTemplateList = JsonUtil.jsonToList(allIndexTemplateListJson, IndexTemplatePO.class);
         String leafIndexTemplateListJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":package:" + packageId + ":leaf");
         List<IndexTemplatePO> leafIndexTemplateList = JsonUtil.jsonToList(leafIndexTemplateListJson, IndexTemplatePO.class);
-        log.info("TaskService--state 共有 " + leafIndexTemplateList.size() + "个叶子节点:" + leafIndexTemplateListJson);
+        log.info("TaskManager--score 共有 " + leafIndexTemplateList.size() + "个叶子节点:" + leafIndexTemplateListJson);
         int maxLevel = 1; // 用于计算指标得分
         List<LeafIndexPO> leafIndexList = new ArrayList<>();
         for (int i = 0; i < leafIndexTemplateList.size(); i++) {
@@ -261,24 +265,27 @@ public class TaskManager {
                     taskOfLeaf.setRunState(DictConstants.TASK_ABORTED);
                     taskOfLeaf.setScore(0.0);
                     taskMapper.updateFailStateWithStopTime(task2Id, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_5);
-                    log.error(e.getMessage());
+                    log.error("TaskManager--score 任务 " + task2Id + " 打分出错!", e.getMessage());
                     // 如果打分失败则开始下一个打分
                 }
             });
 
 
             // 计算不合格的任务数(不到100分就是不合格,执行失败的不算)
-            long notStandardSceneNumber = taskListOfLeafIndex.stream()
-                    .filter(task -> task.getScore() < 100)
-                    .count();
-
             // 计算叶子指标下任务得分总和
-            double leafSum = taskListOfLeafIndex.stream()
-                    .mapToDouble(TaskPO::getScore)
-                    .sum();
+            long notStandardSceneNumber = 0;
+            double leafSum = 0;
+            for (TaskPO task : taskListOfLeafIndex) {
+                Double scoreTemp = task.getScore();
+                leafSum += scoreTemp;
+                if (scoreTemp < 100.0) {
+                    notStandardSceneNumber++;
+                }
+            }
+
             // 计算任务的个数
             long resultNumberOfCurrentIndex = taskListOfLeafIndex.size();
-            log.info("TaskService--state 项目 " + projectId + " 的叶子指标" + indexId + "下成功执行的场景数量为:" + resultNumberOfCurrentIndex);
+            log.info("TaskManager--score 项目 " + projectId + " 的叶子指标" + indexId + "下成功执行的场景数量为:" + resultNumberOfCurrentIndex);
             // 计算叶子指标得分(任务得分总和 / 任务数量)
             double leafIndexScore = resultNumberOfCurrentIndex == 0 ? 0 : NumberUtil.cut(leafSum / resultNumberOfCurrentIndex, 2);
             // 创建叶子指标对象

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/ManualProjectMapper.java

@@ -17,12 +17,12 @@ public interface ManualProjectMapper {
             @Result(column = "scene", property = "scenePackageId", jdbcType = JdbcType.VARCHAR),
             @Result(column = "create_user_id", property = "createUserId", jdbcType = JdbcType.VARCHAR)
     })
-
     @Select("select id, scene, create_user_id\n" +
             "from simulation_manual_project\n" +
             "where id = #{projectId}")
     ProjectPO selectById(@Param("projectId")String projectId);
 
+    @ResultMap("project")
     @Select("select id, scene, create_user_id\n" +
             "from simulation_manual_project\n" +
             "where is_deleted = '0'\n" +

+ 38 - 22
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -1,10 +1,8 @@
 package com.css.simulation.resource.scheduler.scheduler;
 
 import api.common.pojo.constants.DictConstants;
-import api.common.util.CollectionUtil;
-import api.common.util.SshUtil;
-import api.common.util.StringUtil;
-import api.common.util.TimeUtil;
+import api.common.pojo.dto.ProjectMessageDTO;
+import api.common.util.*;
 import com.css.simulation.resource.scheduler.consumer.ManualProjectConsumer;
 import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
 import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
@@ -14,6 +12,7 @@ import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.service.TaskService;
 import io.kubernetes.client.openapi.ApiClient;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
@@ -62,35 +61,52 @@ public class ProjectScheduler {
 
     /**
      * 调度项目启动
-     *
-     * @throws IOException 超时时间
      */
     @Scheduled(fixedDelay = 60 * 1000)
-    public void dispatchProject() throws IOException {
+    @SneakyThrows
+    public void dispatchProject() {
         //1 查询等待执行的项目
         List<ProjectPO> projectList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_WAITING);
-        //2
-        if (CollectionUtil.isEmpty(projectList)) {
-            return;
-        }
-        projectList.forEach(project -> {
+        for (ProjectPO project : projectList) {
             String projectId = project.getId();
             String userId = project.getCreateUserId();
             ClusterPO clusterPO = clusterMapper.selectByUserId(userId);
+            if (clusterPO == null) {
+                log.error("ProjectScheduler--dispatchProject 项目 " + projectId + " 的创建用户 " + userId + " 没有分配集群!");
+                return;
+            }
             String clusterId = clusterPO.getId();
             int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
             // 获取该用户正在运行的项目数量
-            Set<String> runningProjectSet = redisTemplate.keys(manualProjectTopic + ":cluster:" + clusterId + ":running" + "*");
-            int runningProjectNumber = CollectionUtil.isEmpty(runningProjectSet) ? 0 : runningProjectSet.size();
-            if (runningProjectNumber < simulationLicenseNumber) {
-                String projectJson = redisTemplate.opsForValue().get(manualProjectTopic + ":cluster:" + clusterId + ":waiting" + projectId);
-                assert projectJson != null;
-                redisTemplate.delete(manualProjectTopic + ":cluster:" + clusterId + ":waiting" + projectId);
-                redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":running" + projectId, projectJson);
-                manualProjectConsumer.parseManualProject(projectJson);
+            Set<String> runningProjectSet = redisTemplate.keys(manualProjectTopic + ":cluster:" + clusterId + ":running:" + "*");
+            if (CollectionUtil.isNotEmpty(runningProjectSet)) {
+                long parallelismSum = 0;
+                for (String runningProjectKey : runningProjectSet) {
+                    String runningProjectJsonTemp = redisTemplate.opsForValue().get(runningProjectKey);
+                    ProjectMessageDTO runningProjectMessageTemp = JsonUtil.jsonToBean(runningProjectJsonTemp, ProjectMessageDTO.class);
+                    parallelismSum += runningProjectMessageTemp.getParallelism();
+                }
+                if (parallelismSum < simulationLicenseNumber) {
+                    Set<String> waitingProjectSet = redisTemplate.keys(manualProjectTopic + ":cluster:" + clusterId + ":waiting:" + "*");
+                    if (CollectionUtil.isEmpty(waitingProjectSet)) {
+                        return;
+                    }
+                    for (String waitingProjectKey : waitingProjectSet) {
+                        String waitingProjectJsonTemp = redisTemplate.opsForValue().get(waitingProjectKey);
+                        ProjectMessageDTO waitingProjectMessageTemp = JsonUtil.jsonToBean(waitingProjectJsonTemp, ProjectMessageDTO.class);
+                        int parallelism = waitingProjectMessageTemp.getParallelism();
+                        if (parallelismSum + parallelism < simulationLicenseNumber) {
+                            String projectJson = redisTemplate.opsForValue().get(manualProjectTopic + ":cluster:" + clusterId + ":waiting:" + projectId);
+                            redisTemplate.delete(manualProjectTopic + ":cluster:" + clusterId + ":waiting:" + projectId);
+                            assert projectJson != null;
+                            redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":running:" + projectId, projectJson);
+                            log.info("ProjectScheduler--dispatchProject 项目 " + projectId + " 从等待队列进入执行状态!");
+                            manualProjectConsumer.parseManualProject(projectJson);
+                        }
+                    }
+                }
             }
-        });
-
+        }
 
     }