martin 2 роки тому
батько
коміт
148d689929

+ 4 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -159,7 +159,7 @@ public class ProjectConsumer {
         if (parallelismSum > 0L) {
             log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点 " + nodeMap + " 上执行!");
             projectMessageDTO.setCurrentParallelism(parallelismSum);    // 设置实际的并行度
-            parseProject(nodeMap, projectMessageDTO, "cluster:" + clusterId, projectRunningKey);
+            parseProject(nodeMap, projectMessageDTO, projectWaitingKey, projectRunningKey);
         } else {
             log.info("ProjectConsumer--cacheManualProject 服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
             wait(projectWaitingKey, projectMessageDTO);
@@ -179,11 +179,11 @@ public class ProjectConsumer {
     /**
      * @param nodeMap           节点列表以及剩余可用并行度
      * @param projectMessageDTO 初始接收到的项目启动信息
-     * @param clusterPrefix     clusterPrefix
+     * @param projectWaitingKey projectWaitingKey
      * @param projectRunningKey projectRunningKey
      */
     @SneakyThrows
-    public void parseProject(Map<String, Integer> nodeMap, ProjectMessageDTO projectMessageDTO, String clusterPrefix, String projectRunningKey) {
+    public void parseProject(Map<String, Integer> nodeMap, ProjectMessageDTO projectMessageDTO, String projectWaitingKey, String projectRunningKey) {
         String projectId = projectMessageDTO.getProjectId();    // 项目 id
         String projectType = projectMessageDTO.getType();   // 项目类型
         String packageId = projectMessageDTO.getScenePackageId();   // 场景测试包 id
@@ -191,7 +191,7 @@ public class ProjectConsumer {
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
         // -------------------------------- 0 准备 --------------------------------
-        projectService.prepare(nodeMap, projectMessageDTO, clusterPrefix);
+        projectService.prepare(nodeMap, projectMessageDTO, projectWaitingKey, projectRunningKey);
         String userId = null;
         if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
             userId = manualProjectMapper.selectCreateUserById(projectId);

+ 13 - 12
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -15,6 +15,7 @@ import com.css.simulation.resource.scheduler.pojo.po.UserPO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.service.TaskService;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
+import com.css.simulation.resource.scheduler.util.RedisUtil;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
@@ -127,18 +128,18 @@ public class ProjectScheduler {
             }
             int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
             // 获取该用户正在运行的项目数量
-            Set<String> clusterRunningKeySet = stringRedisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
-            List<String> runningProjectSet = null;
+            Set<String> keySetOfAllProjectOfCluster = RedisUtil.getKeySetByPrefix(stringRedisTemplate, redisPrefix.getClusterRunningPrefix());
+            List<String> keyListOfRunningProject = null;
             // cluster:${clusterId}:running:${projectId}
-            if (CollectionUtil.isNotEmpty(clusterRunningKeySet)) {
-                runningProjectSet = projectUtil.getRunningProjectList(clusterRunningKeySet);
-                if (CollectionUtil.isNotEmpty(runningProjectSet)) {
-                    log.info("ProjectScheduler--dispatchProject 运行中的项目的 key 有:" + runningProjectSet);
+            if (CollectionUtil.isNotEmpty(keySetOfAllProjectOfCluster)) {
+                keyListOfRunningProject = projectUtil.getRunningProjectList(keySetOfAllProjectOfCluster);   // 筛选出运行中的项目信息 (key 为 cluster:${cluster}:running:${projectId})
+                if (CollectionUtil.isNotEmpty(keyListOfRunningProject)) {
+                    log.info("ProjectScheduler--dispatchProject 运行中的项目的 key 有:" + keyListOfRunningProject);
                     long parallelismSum = 0;
-                    for (String runningProjectKey : runningProjectSet) {
-                        parallelismSum += JsonUtil.jsonToBean(stringRedisTemplate.opsForValue().get(runningProjectKey), ProjectMessageDTO.class).getParallelism();
+                    for (String keyOfRunningProject : keyListOfRunningProject) {
+                        parallelismSum += JsonUtil.jsonToBean(RedisUtil.getStringByKey(stringRedisTemplate, keyOfRunningProject), ProjectMessageDTO.class).getParallelism();
                     }
-                    if (parallelismSum < simulationLicenseNumber) {
+                    if (parallelismSum < simulationLicenseNumber) { // 已经运行的项目小于集群允许运行的项目则,运行新的项目
                         if (parallelismSum + parallelism < simulationLicenseNumber) {
                             run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
                             return;
@@ -146,7 +147,7 @@ public class ProjectScheduler {
                     }
                 }
             }
-            if ((CollectionUtil.isEmpty(clusterRunningKeySet) || CollectionUtil.isEmpty(runningProjectSet)) && parallelism < simulationLicenseNumber) {
+            if ((CollectionUtil.isEmpty(keySetOfAllProjectOfCluster) || CollectionUtil.isEmpty(keyListOfRunningProject)) && parallelism < simulationLicenseNumber) {
                 run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
             }
         }
@@ -157,7 +158,7 @@ public class ProjectScheduler {
     public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey, int parallelism) {
         ProjectMessageDTO projectMessageDTO;
         try {
-            projectMessageDTO = JsonUtil.jsonToBean(stringRedisTemplate.opsForValue().get(projectWaitingKey), ProjectMessageDTO.class);
+            projectMessageDTO = JsonUtil.jsonToBean(RedisUtil.getStringByKey(stringRedisTemplate, projectWaitingKey), ProjectMessageDTO.class);
         } catch (Exception e) {
             log.error("ProjectScheduler--run 等待执行的项目信息已经从 redis 删除。");
             return;
@@ -175,7 +176,7 @@ public class ProjectScheduler {
         if (parallelismSum > 0L) {
             log.info("ProjectScheduler--run 集群 " + clusterId + " 将项目 " + projectId + "在节点 " + nodeMap + " 上执行。");
             projectMessageDTO.setCurrentParallelism(parallelismSum);    // 设置实际的并行度
-            projectConsumer.parseProject(nodeMap, projectMessageDTO, "cluster:" + clusterId, projectRunningKey);
+            projectConsumer.parseProject(nodeMap, projectMessageDTO, projectWaitingKey, projectRunningKey);
         }
     }
 

+ 7 - 14
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -6,10 +6,7 @@ import api.common.util.*;
 import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.pojo.to.*;
-import com.css.simulation.resource.scheduler.util.GitUtil;
-import com.css.simulation.resource.scheduler.util.KubernetesUtil;
-import com.css.simulation.resource.scheduler.util.MinioUtil;
-import com.css.simulation.resource.scheduler.util.ProjectUtil;
+import com.css.simulation.resource.scheduler.util.*;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -84,10 +81,11 @@ public class ProjectService {
     /**
      * @param nodeMap           节点列表以及剩余可用并行度
      * @param projectMessageDTO 初始接收到的项目启动信息
-     * @param clusterPrefix     clusterPrefix
+     * @param projectWaitingKey     projectWaitingKey
+     * @param projectRunningKey     projectRunningKey
      */
     @Transactional
-    public void prepare(Map<String, Integer> nodeMap, ProjectMessageDTO projectMessageDTO, String clusterPrefix) {
+    public void prepare(Map<String, Integer> nodeMap, ProjectMessageDTO projectMessageDTO, String projectWaitingKey,String projectRunningKey) {
         String projectId = projectMessageDTO.getProjectId();
         //1 将指定 node 的并行度减少
         nodeMap.keySet().forEach(nodeName -> {
@@ -97,14 +95,9 @@ public class ProjectService {
             stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism - parallelismToUse) + "");
         });
         //2 将 redis 中该项目旧的信息则直接删除(包括 waitingKey)
-        Set<String> oldKeys = stringRedisTemplate.keys(clusterPrefix + "*");
-        if (CollectionUtil.isNotEmpty(oldKeys)) {
-            for (String oldKey : oldKeys) {
-                if (oldKey.contains(projectId)) {
-                    stringRedisTemplate.delete(oldKeys);
-                }
-            }
-        }
+        RedisUtil.deleteByPrefix(stringRedisTemplate,projectWaitingKey);
+        RedisUtil.deleteByPrefix(stringRedisTemplate,projectRunningKey);
+
 
         //5 将该 project 下所有旧的任务和指标得分删除。
         taskMapper.deleteByProject(projectId);

+ 16 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/RedisUtil.java

@@ -2,14 +2,29 @@ package com.css.simulation.resource.scheduler.util;
 
 import org.springframework.data.redis.core.StringRedisTemplate;
 
+import java.util.Collection;
 import java.util.Set;
 
 public class RedisUtil {
 
-    public static void deleteByKey(StringRedisTemplate stringRedisTemplate,String key){
+
+    public static void deleteByKey(StringRedisTemplate stringRedisTemplate, String key) {
         stringRedisTemplate.delete(key);
     }
 
+    public static void deleteByKeyCollection(StringRedisTemplate stringRedisTemplate, Collection<String> keyCollection) {
+        stringRedisTemplate.delete(keyCollection);
+    }
+
+    public static void deleteByPrefix(StringRedisTemplate stringRedisTemplate, String prefix) {
+        Set<String> keySetByPrefix = getKeySetByPrefix(stringRedisTemplate, prefix);
+        deleteByKeyCollection(stringRedisTemplate, keySetByPrefix);
+    }
+
+    public static String getStringByKey(StringRedisTemplate stringRedisTemplate, String key) {
+        return stringRedisTemplate.opsForValue().get(key);
+    }
+
     public static Set<String> getKeySetByPrefix(StringRedisTemplate stringRedisTemplate, String prefix) {
         return stringRedisTemplate.keys(prefix + "*");
     }