martin 2 years ago
parent
commit
daaa7f8b0d

+ 2 - 1
api-common/src/main/java/api/common/pojo/dto/ProjectMessageDTO.java

@@ -13,6 +13,7 @@ import lombok.NoArgsConstructor;
  *  "scenePackageId": "sadfasdfs", // 场景包 id
  *  "scenePackageId": "sadfasdfs", // 场景包 id
  *  "maxSimulationTime": 11111, // 最大仿真时间
  *  "maxSimulationTime": 11111, // 最大仿真时间
  *  "parallelism": 30  // 并行度
  *  "parallelism": 30  // 并行度
+ *  "type": 30  // 并行度
  * }
  * }
  */
  */
 @Data
 @Data
@@ -27,6 +28,6 @@ public class ProjectMessageDTO {
     private String scenePackageId;// 场景包 id
     private String scenePackageId;// 场景包 id
     private Long maxSimulationTime;// 最大仿真时间(秒)
     private Long maxSimulationTime;// 最大仿真时间(秒)
     private Long parallelism;// 并行度,创建 pod 时使用
     private Long parallelism;// 并行度,创建 pod 时使用
-    private String projectType;// 项目类型
+    private String type;// 项目类型
 
 
 }
 }

+ 8 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -95,7 +95,7 @@ public class ProjectConsumer {
         ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
         ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
         String projectId = projectMessageDTO.getProjectId();    // 手动执行项目 id 或 自动执行子项目 id
         String projectId = projectMessageDTO.getProjectId();    // 手动执行项目 id 或 自动执行子项目 id
         Long parallelism = projectMessageDTO.getParallelism();   // 项目并行度
         Long parallelism = projectMessageDTO.getParallelism();   // 项目并行度
-        String projectType = projectMessageDTO.getProjectType(); // 项目类型
+        String projectType = projectMessageDTO.getType(); // 项目类型
         //2 根据 projectId 获取创建用户 id
         //2 根据 projectId 获取创建用户 id
         String userId;
         String userId;
         if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
         if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
@@ -137,7 +137,13 @@ public class ProjectConsumer {
         int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
         int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
         // 获取该集群中正在运行的项目,如果没有则立即执行
         // 获取该集群中正在运行的项目,如果没有则立即执行
         PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
         PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
-        Set<String> runningProjectSet = redisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
+        Set<String> clusterRunningKeySet = redisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
+        List<String> runningProjectSet;
+        if (CollectionUtil.isEmpty(clusterRunningKeySet)) {
+            run(clusterId, projectId, redisPrefix.getProjectRunningKey(), projectJson);
+            return;
+        }
+        runningProjectSet = projectUtil.getRunningProjectList(clusterRunningKeySet);
         if (CollectionUtil.isEmpty(runningProjectSet)) {
         if (CollectionUtil.isEmpty(runningProjectSet)) {
             run(clusterId, projectId, redisPrefix.getProjectRunningKey(), projectJson);
             run(clusterId, projectId, redisPrefix.getProjectRunningKey(), projectJson);
             return;
             return;

+ 20 - 24
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -30,7 +30,6 @@ import org.springframework.stereotype.Component;
 
 
 import java.util.List;
 import java.util.List;
 import java.util.Set;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 
 @Component
 @Component
 @Slf4j
 @Slf4j
@@ -91,36 +90,33 @@ public class ProjectScheduler {
             }
             }
             int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
             int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
             // 获取该用户正在运行的项目数量
             // 获取该用户正在运行的项目数量
-            Set<String> keySet = redisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
-            if (CollectionUtil.isEmpty(keySet)) {
-                log.error("ProjectScheduler--dispatchProject 正在运行的项目 " + projectId + " 没有对应的缓存数据!");
-                return;
-            }
+            Set<String> clusterRunningKeySet = redisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
+            List<String> runningProjectSet;
             // cluster:${clusterId}:running:${projectId}
             // cluster:${clusterId}:running:${projectId}
-            List<String> runningProjectSet = keySet.stream().filter(key -> StringUtil.countSubString(key, ":") == 3).collect(Collectors.toList());
-            if (CollectionUtil.isNotEmpty(runningProjectSet)) {
-                long parallelismSum = 0;
-                for (String runningProjectKey : runningProjectSet) {
-                    parallelismSum += JsonUtil.jsonToBean(redisTemplate.opsForValue().get(runningProjectKey), ProjectMessageDTO.class).getParallelism();
-                }
-                if (parallelismSum < simulationLicenseNumber) {
-                    Set<String> waitingProjectSet = redisTemplate.keys(redisPrefix.getClusterWaitingPrefix() + "*");
-                    if (CollectionUtil.isEmpty(waitingProjectSet)) {
-                        return;
+            if (CollectionUtil.isNotEmpty(clusterRunningKeySet)) {
+                runningProjectSet = projectUtil.getRunningProjectList(clusterRunningKeySet);
+                if (CollectionUtil.isNotEmpty(runningProjectSet)) {
+                    long parallelismSum = 0;
+                    for (String runningProjectKey : runningProjectSet) {
+                        parallelismSum += JsonUtil.jsonToBean(redisTemplate.opsForValue().get(runningProjectKey), ProjectMessageDTO.class).getParallelism();
                     }
                     }
-                    for (String waitingProjectKey : waitingProjectSet) {
-                        Long parallelism = JsonUtil.jsonToBean(redisTemplate.opsForValue().get(waitingProjectKey), ProjectMessageDTO.class).getParallelism();
-                        if (parallelismSum + parallelism < simulationLicenseNumber) {
-                            run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
+                    if (parallelismSum < simulationLicenseNumber) {
+                        Set<String> waitingProjectSet = redisTemplate.keys(redisPrefix.getClusterWaitingPrefix() + "*");
+                        if (CollectionUtil.isEmpty(waitingProjectSet)) {
+                            return;
+                        }
+                        for (String waitingProjectKey : waitingProjectSet) {
+                            Long parallelism = JsonUtil.jsonToBean(redisTemplate.opsForValue().get(waitingProjectKey), ProjectMessageDTO.class).getParallelism();
+                            if (parallelismSum + parallelism < simulationLicenseNumber) {
+                                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
+                                return;
+                            }
                         }
                         }
                     }
                     }
                 }
                 }
-            } else {
-                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
             }
             }
-
+            run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
         }
         }
-
     }
     }
 
 
     public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey) {
     public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey) {

+ 10 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -1,6 +1,7 @@
 package com.css.simulation.resource.scheduler.util;
 package com.css.simulation.resource.scheduler.util;
 
 
 import api.common.pojo.constants.DictConstants;
 import api.common.pojo.constants.DictConstants;
+import api.common.util.StringUtil;
 import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
 import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
 import com.css.simulation.resource.scheduler.mapper.UserMapper;
 import com.css.simulation.resource.scheduler.mapper.UserMapper;
 import com.css.simulation.resource.scheduler.pojo.po.UserPO;
 import com.css.simulation.resource.scheduler.pojo.po.UserPO;
@@ -9,6 +10,10 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
 /**
  * 任务结果打分
  * 任务结果打分
  */
  */
@@ -122,4 +127,9 @@ public class ProjectUtil {
     }
     }
 
 
 
 
+    public List<String> getRunningProjectList(Set<String> clusterRunningKeySet) {
+        return clusterRunningKeySet.stream().filter(key -> StringUtil.countSubString(key, ":") == 3).collect(Collectors.toList());
+    }
+
+
 }
 }