Эх сурвалжийг харах

等待执行的项目信息已经从 redis 删除

martin 2 жил өмнө
parent
commit
2e988a822b

+ 9 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -155,11 +155,17 @@ public class ProjectScheduler {
 
     @SneakyThrows
     public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey, long parallelism) {
-        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(stringRedisTemplate.opsForValue().get(projectWaitingKey), ProjectMessageDTO.class);
-
+        ProjectMessageDTO projectMessageDTO;
+        try {
+            projectMessageDTO = JsonUtil.jsonToBean(stringRedisTemplate.opsForValue().get(projectWaitingKey), ProjectMessageDTO.class);
+        } catch (Exception e) {
+            log.error("ProjectScheduler--run 等待执行的项目信息已经从 redis 删除。");
+            return;
+        }
         //1 获取所有节点的剩余可用并行度
         Map<String, Integer> nodeMap = projectUtil.getNodeMapToUse(parallelism);
         if (CollectionUtil.isEmpty(nodeMap)) {
+            log.info("ProjectScheduler--run 集群中没有可用并行度,项目 " + projectId + " 继续等待。");
             return;
         }
         //2 计算实际可用并行度
@@ -167,7 +173,7 @@ public class ProjectScheduler {
 
         //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
         if (parallelismSum > 0L) {
-            log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点 " + nodeMap + " 上以并行度 " + parallelismSum + " 执行!");
+            log.info("ProjectScheduler--run 集群 " + clusterId + " 将项目 " + projectId + "在节点 " + nodeMap + " 上执行。");
             projectMessageDTO.setCurrentParallelism(parallelismSum);    // 设置实际的并行度
             projectConsumer.parseProject(nodeMap, projectMessageDTO, "cluster:" + clusterId, projectRunningKey);
         }

+ 3 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -180,9 +180,7 @@ public class ProjectUtil {
                 resultNodeMap.put(name, 0);
             }
         }
-        if (CollectionUtil.isEmpty(restNodeList)) {
-            return null;
-        } else {
+        if (!CollectionUtil.isEmpty(restNodeList)) {
             for (int i = 0; i < parallelism; i++) {
                 // 每次降序排序都取剩余并行度最大的一个。
                 restNodeList.sort((o1, o2) -> o2.getMaxParallelism() - o1.getMaxParallelism());
@@ -194,8 +192,9 @@ public class ProjectUtil {
                     resultNodeMap.put(tempNodeName, resultNodeMap.get(tempNodeName) + 1);
                 }
             }
-            return resultNodeMap;
         }
+        log.info("ProjectUtil--getNodeMapToUse 即将使用节点为:" + resultNodeMap);
+        return resultNodeMap;
     }