root 2 лет назад
Родитель
Сommit
c299188824

+ 2 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/configuration/esmini/EsminiConfiguration.java

@@ -1,6 +1,6 @@
 package com.css.simulation.resource.scheduler.configuration.esmini;
 
-import com.css.simulation.resource.scheduler.entity.EsminiNodeEntity;
+import com.css.simulation.resource.scheduler.entity.CpuNodeEntity;
 import lombok.Data;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
@@ -11,5 +11,5 @@ import java.util.List;
 @Configuration
 @ConfigurationProperties(prefix = "esmini")
 public class EsminiConfiguration {
-    private List<EsminiNodeEntity> nodeList;
+    private List<CpuNodeEntity> nodeList;
 }

+ 2 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/configuration/kubernetes/KubernetesConfiguration.java

@@ -1,6 +1,6 @@
 package com.css.simulation.resource.scheduler.configuration.kubernetes;
 
-import com.css.simulation.resource.scheduler.entity.KubernetesNodeEntity;
+import com.css.simulation.resource.scheduler.entity.GpuNodeEntity;
 import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.util.ClientBuilder;
 import io.kubernetes.client.util.KubeConfig;
@@ -29,7 +29,7 @@ public class KubernetesConfiguration {
     private String commandVtdCarsimNogpu;
     private String carsimImage;
     private String carsimCommand;
-    private List<KubernetesNodeEntity> nodeList;
+    private List<GpuNodeEntity> nodeList;
 
     @Bean
     @SneakyThrows

+ 16 - 9
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/configuration/redis/CustomRedisClient.java

@@ -17,6 +17,21 @@ public class CustomRedisClient {
     @Resource
     private StringRedisTemplate stringRedisTemplate;
 
+    public String get(String key) {
+        return stringRedisTemplate.opsForValue().get(key);
+    }
+
+    public void set(String key, String value) {
+         stringRedisTemplate.opsForValue().set(key, value);
+    }
+
+    public void flushdb() {
+        Set<String> keys = stringRedisTemplate.keys("*");
+        if (CollectionUtil.isNotEmpty(keys)) {
+            stringRedisTemplate.delete(keys);
+        }
+    }
+
     public void deleteByKey(String key) {
         stringRedisTemplate.delete(key);
     }
@@ -33,20 +48,12 @@ public class CustomRedisClient {
     public String getStringByKey(String key) {
         return stringRedisTemplate.opsForValue().get(key);
     }
-    public String get(String key) {
-        return stringRedisTemplate.opsForValue().get(key);
-    }
+
 
     public Set<String> getKeySetByPrefix(String prefix) {
         return stringRedisTemplate.keys(prefix + "*");
     }
 
-    public void flushdb() {
-        Set<String> keys = stringRedisTemplate.keys("*");
-        if (CollectionUtil.isNotEmpty(keys)) {
-            stringRedisTemplate.delete(keys);
-        }
-    }
 
     public Boolean getDistributedLock(String key) {
         return stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10L, TimeUnit.SECONDS);

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -501,7 +501,7 @@ public class ProjectConsumer {
         //2 将指定 node 的并行度减少
         nodeMap.keySet().forEach(nodeName -> {
             int parallelismToUse = nodeMap.get(nodeName);
-            String restParallelismKey = "node:" + nodeName + ":parallelism";
+            String restParallelismKey = "gpu-node:" + nodeName + ":parallelism";
             int restParallelism = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(restParallelismKey))); // 剩余可用并行度
             stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism - parallelismToUse) + "");
         });

+ 4 - 8
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/entity/KubernetesNodeEntity.java → simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/entity/CpuNodeEntity.java

@@ -9,19 +9,15 @@ import lombok.NoArgsConstructor;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class KubernetesNodeEntity implements Cloneable {
+public class CpuNodeEntity implements Cloneable {
     private String name;
-    private Integer maxParallelism;
-//    private String hostname;
-//    private String username;
-//    private String password;
-
+    private Integer parallelism;
 
     @Override
-    public KubernetesNodeEntity clone() {
+    public CpuNodeEntity clone() {
         try {
             // TODO: copy mutable state here, so the clone can't change the internals of the original
-            return (KubernetesNodeEntity) super.clone();
+            return (CpuNodeEntity) super.clone();
         } catch (CloneNotSupportedException e) {
             throw new AssertionError();
         }

+ 4 - 7
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/entity/EsminiNodeEntity.java → simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/entity/GpuNodeEntity.java

@@ -9,19 +9,16 @@ import lombok.NoArgsConstructor;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class EsminiNodeEntity implements Cloneable {
+public class GpuNodeEntity implements Cloneable {
     private String name;
-    private Integer maxParallelism;
-//    private String hostname;
-//    private String username;
-//    private String password;
+    private Integer parallelism;
 
 
     @Override
-    public EsminiNodeEntity clone() {
+    public GpuNodeEntity clone() {
         try {
             // TODO: copy mutable state here, so the clone can't change the internals of the original
-            return (EsminiNodeEntity) super.clone();
+            return (GpuNodeEntity) super.clone();
         } catch (CloneNotSupportedException e) {
             throw new AssertionError();
         }

+ 1 - 10
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -159,7 +159,7 @@ public class TaskManager {
             } else {
                 //如果项目已完成先把 pod 删除,并归还并行度
                 KubernetesUtil.deletePod2(apiClient, kubernetesConfiguration.getNamespace(), podName);
-                projectUtil.addOneParallelismToNode(nodeName);
+                projectUtil.returnOneParallelismToGpuNode(nodeName);
             }
             RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getTaskMessageKey());
             RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getTaskPodKey());
@@ -472,7 +472,6 @@ public class TaskManager {
 
     @SneakyThrows
     public void done(PrefixEntity redisPrefix, String projectId, String projectType) {
-
         // 更新项目状态为已完成
         if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
             manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());
@@ -481,19 +480,11 @@ public class TaskManager {
         }
         // 删除 kafka topic
         ApacheKafkaUtil.deleteTopic(admin, projectId);
-
         // 删除 redis 中的 项目运行信息 键值对
         RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getProjectRunningKey());
         RedisUtil.deleteByPrefix(stringRedisTemplate, "project:" + projectId);
         // 删除剩余 yaml
         projectUtil.deleteYamlByProjectId(projectId);
-        if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-            log.info("手动运行项目 " + projectId + " 执行完成!");
-        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-            log.info("自动运行子项目 " + projectId + " 执行完成!");
-        }
-
-
     }
 
 

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -316,7 +316,7 @@ public class ProjectService {
                 // 删除 pod
                 projectUtil.deletePod(podName);
                 // 节点并行度加一
-                projectUtil.addOneParallelismToNode(nodeName);
+                projectUtil.returnOneParallelismToGpuNode(nodeName);
             }
         }
 

+ 5 - 5
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -38,20 +38,20 @@ public class TaskService {
         String maxSimulationTime = projectEntity.getMaxSimulationTime();  // 项目类型
         String userId = taskEntity.getCreateUserId();   // 用户 id
         PrefixEntity redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaskId(userId, projectId, taskId);  // 项目前缀
-        //1 判断项目是否已完成
+        log.info("判断项目是否已完成。");
         boolean projectCompleted = taskManager.isProjectCompleted(redisPrefix, projectId, projectType, maxSimulationTime, taskId, state, podName);
         if (!projectCompleted) {
             return;
         }
-        //2 准备打分
         log.info("项目 {} 准备打分。", projectId);
         taskManager.prepareScore(redisPrefix.getProjectRunningKey());
-        //3 打分
+        log.info("项目 {} 开始打分。", projectId);
         taskManager.score(userId, projectId, projectType);
-        //4 调用 server 的接口,计算评价等级
+        log.info("项目 {} 计算评价等级。", projectId);
         taskManager.evaluationLevel(projectId);
-        //5 结束
+        log.info("项目 {} 开始释放资源。", projectId);
         taskManager.done(redisPrefix, projectId, projectType);
+        log.info("项目 {} 运行结束。", projectId);
 
     }
 

+ 54 - 49
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -3,17 +3,14 @@ package com.css.simulation.resource.scheduler.util;
 import api.common.pojo.constants.DictConstants;
 import api.common.pojo.dto.ProjectMessageDTO;
 import api.common.util.*;
+import com.css.simulation.resource.scheduler.configuration.esmini.EsminiConfiguration;
 import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
 import com.css.simulation.resource.scheduler.configuration.redis.CustomRedisClient;
-import com.css.simulation.resource.scheduler.entity.ProjectEntity;
-import com.css.simulation.resource.scheduler.entity.UserEntity;
+import com.css.simulation.resource.scheduler.entity.*;
 import com.css.simulation.resource.scheduler.mapper.AutoSubProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
 import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.UserMapper;
-import com.css.simulation.resource.scheduler.entity.KubernetesNodeEntity;
-import com.css.simulation.resource.scheduler.entity.NodeEntity;
-import com.css.simulation.resource.scheduler.entity.PrefixEntity;
 import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.openapi.ApiException;
 import lombok.SneakyThrows;
@@ -55,6 +52,8 @@ public class ProjectUtil {
     @Resource
     private ClusterMapper clusterMapper;
     @Resource
+    private EsminiConfiguration esminiConfiguration;
+    @Resource
     private KubernetesConfiguration kubernetesConfiguration;
     @Resource
     private ApiClient apiClient;
@@ -148,7 +147,7 @@ public class ProjectUtil {
         if (CollectionUtil.isEmpty(yamlPathCacheKeySet)) {
             // 如果当前节点没有下一个yaml,则返回一个并行度。
             log.info("createNextPod3() 节点 " + nodeName + " 已经执行完被分配的项目 " + projectId + " 的所有 pod。");
-            addOneParallelismToNode(nodeName);
+            returnOneParallelismToGpuNode(nodeName);
         } else {
             final String yamlPathCacheKey = new ArrayList<>(yamlPathCacheKeySet).get(0);
             final String absolutePath = stringRedisTemplate.opsForValue().get(yamlPathCacheKey);
@@ -242,14 +241,14 @@ public class ProjectUtil {
      * @return 节点映射(节点名,并行度)
      */
     public Map<String, Integer> getNodeMap() {
-        List<KubernetesNodeEntity> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
+        List<GpuNodeEntity> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
         log.info("预设并行度的节点列表为:" + initialNodeList);
         Map<String, Integer> resultNodeMap = new HashMap<>();    // 用于执行的节点映射(节点名,并行度)
-        for (KubernetesNodeEntity kubernetesNodeSource : initialNodeList) {
-            KubernetesNodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
+        for (GpuNodeEntity kubernetesNodeSource : initialNodeList) {
+            GpuNodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
             String nodeName = kubernetesNodeCopy.getName();
-            int maxParallelism = kubernetesNodeCopy.getMaxParallelism();
-            String restParallelismKey = "node:" + nodeName + ":parallelism";
+            int maxParallelism = kubernetesNodeCopy.getParallelism();
+            String restParallelismKey = "gpu-node:" + nodeName + ":parallelism";
             String restParallelismString = stringRedisTemplate.opsForValue().get(restParallelismKey);
             int restParallelism;
             if (restParallelismString == null) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
@@ -271,29 +270,29 @@ public class ProjectUtil {
      * @return 集群剩余并行度
      */
     public int getRestParallelism() {
-        List<KubernetesNodeEntity> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
+        List<GpuNodeEntity> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
         // 遍历所有节点,获取还有剩余并行度的节点
-        List<KubernetesNodeEntity> restNodeList = new ArrayList<>();    // 剩余并行度的节点列表
-        for (KubernetesNodeEntity kubernetesNodeSource : initialNodeList) {
-            KubernetesNodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
+        List<GpuNodeEntity> restNodeList = new ArrayList<>();    // 剩余并行度的节点列表
+        for (GpuNodeEntity kubernetesNodeSource : initialNodeList) {
+            GpuNodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
             String nodeName = kubernetesNodeCopy.getName();   // 节点名称
-            int maxParallelism = kubernetesNodeCopy.getMaxParallelism();
-            String restParallelismString = stringRedisTemplate.opsForValue().get("node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
+            int maxParallelism = kubernetesNodeCopy.getParallelism();
+            String restParallelismString = stringRedisTemplate.opsForValue().get("gpu-node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
             // -------------------------------- Comment --------------------------------
             int restParallelism;
             if (restParallelismString == null || Integer.parseInt(restParallelismString) > maxParallelism) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
                 restParallelism = maxParallelism;
-                stringRedisTemplate.opsForValue().set("node:" + nodeName + ":parallelism", restParallelism + "");
+                stringRedisTemplate.opsForValue().set("gpu-node:" + nodeName + ":parallelism", restParallelism + "");
             } else {
                 restParallelism = Integer.parseInt(restParallelismString);
-                kubernetesNodeCopy.setMaxParallelism(restParallelism);
+                kubernetesNodeCopy.setParallelism(restParallelism);
             }
             if (restParallelism > 0) {
                 restNodeList.add(kubernetesNodeCopy);
             }
         }
         log.info("ProjectUtil--getRestParallelism 集群剩余并行度为:" + restNodeList);
-        return restNodeList.size() == 0 ? 0 : restNodeList.stream().mapToInt(KubernetesNodeEntity::getMaxParallelism).sum();
+        return restNodeList.size() == 0 ? 0 : restNodeList.stream().mapToInt(GpuNodeEntity::getParallelism).sum();
     }
 
     /**
@@ -303,23 +302,23 @@ public class ProjectUtil {
      * @return 节点映射(节点名,并行度)
      */
     public Map<String, Integer> getNodeMapToUse(int parallelism) {
-        List<KubernetesNodeEntity> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
+        List<GpuNodeEntity> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
         log.info("预设并行度的节点列表为:" + initialNodeList);
         // 遍历所有节点,获取还有剩余并行度的节点
-        List<KubernetesNodeEntity> restNodeList = new ArrayList<>();    // 剩余并行度的节点列表
-        for (KubernetesNodeEntity kubernetesNodeSource : initialNodeList) {
-            KubernetesNodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
+        List<GpuNodeEntity> restNodeList = new ArrayList<>();    // 剩余并行度的节点列表
+        for (GpuNodeEntity kubernetesNodeSource : initialNodeList) {
+            GpuNodeEntity kubernetesNodeCopy = kubernetesNodeSource.clone();
             String nodeName = kubernetesNodeCopy.getName();   // 节点名称
-            int maxParallelism = kubernetesNodeCopy.getMaxParallelism();
-            String restParallelismString = stringRedisTemplate.opsForValue().get("node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
+            int maxParallelism = kubernetesNodeCopy.getParallelism();
+            String restParallelismString = stringRedisTemplate.opsForValue().get("gpu-node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
             // -------------------------------- Comment --------------------------------
             int restParallelism;
             if (restParallelismString == null || Integer.parseInt(restParallelismString) > maxParallelism) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
                 restParallelism = maxParallelism;
-                stringRedisTemplate.opsForValue().set("node:" + nodeName + ":parallelism", restParallelism + "");
+                stringRedisTemplate.opsForValue().set("gpu-node:" + nodeName + ":parallelism", restParallelism + "");
             } else {
                 restParallelism = Integer.parseInt(restParallelismString);
-                kubernetesNodeCopy.setMaxParallelism(restParallelism);
+                kubernetesNodeCopy.setParallelism(restParallelism);
             }
             if (restParallelism > 0) {
                 restNodeList.add(kubernetesNodeCopy);
@@ -329,20 +328,20 @@ public class ProjectUtil {
         Map<String, Integer> resultNodeMap = new HashMap<>();    // 用于执行的节点映射(节点名,并行度)
         if (!CollectionUtil.isEmpty(restNodeList)) {
             if (restNodeList.size() == 1) {
-                KubernetesNodeEntity tempNode = restNodeList.get(0);
+                GpuNodeEntity tempNode = restNodeList.get(0);
                 String tempNodeName = tempNode.getName();
-                int tempParallelism = tempNode.getMaxParallelism();
+                int tempParallelism = tempNode.getParallelism();
                 resultNodeMap.put(tempNodeName, Math.min(tempParallelism, parallelism));
             }
             if (restNodeList.size() > 1) {
                 for (int i = 0; i < parallelism; i++) {
                     // 每次降序排序都取剩余并行度最大的一个。
-                    restNodeList.sort((o1, o2) -> o2.getMaxParallelism() - o1.getMaxParallelism());
-                    KubernetesNodeEntity tempNode = restNodeList.get(0);
+                    restNodeList.sort((o1, o2) -> o2.getParallelism() - o1.getParallelism());
+                    GpuNodeEntity tempNode = restNodeList.get(0);
                     String tempNodeName = tempNode.getName();
-                    int tempParallelism = tempNode.getMaxParallelism();
+                    int tempParallelism = tempNode.getParallelism();
                     if (tempParallelism > 0) {
-                        tempNode.setMaxParallelism(tempParallelism - 1);
+                        tempNode.setParallelism(tempParallelism - 1);
                         CollectionUtil.addValueToMap(resultNodeMap, 1, tempNodeName);
                     }
                 }
@@ -477,8 +476,8 @@ public class ProjectUtil {
      * @param nodeName 节点名称
      */
     @Synchronized
-    public void addOneParallelismToNode(String nodeName) {
-        String key = "node:" + nodeName + ":parallelism";
+    public void returnOneParallelismToGpuNode(String nodeName) {
+        String key = "gpu-node:" + nodeName + ":parallelism";
         String parallelismString = stringRedisTemplate.opsForValue().get(key);
         if (StringUtil.isEmpty(parallelismString)) {
             throw new RuntimeException("redisKey " + key + " 为空。");
@@ -486,15 +485,26 @@ public class ProjectUtil {
         final int parallelismBefore = Integer.parseInt(parallelismString);
         final int parallelismAfter = parallelismBefore + 1;
         stringRedisTemplate.opsForValue().set(key, parallelismAfter + "");
-        log.info("归还节点 " + nodeName + " 并行度:" + parallelismBefore + " --> " + parallelismAfter);
+        log.info("归还节点 " + nodeName + " 的 GPU 并行度:" + parallelismBefore + " --> " + parallelismAfter);
     }
 
-    public void parallelismAddOne(String nodeName) {
-
-    }
-
-    public void parallelismReduceOne(String nodeName) {
 
+    /**
+     * 将 redis 中的变量 +1,需要保证同步
+     *
+     * @param nodeName 节点名称
+     */
+    @Synchronized
+    public void returnOneParallelismToCpuNode(String nodeName) {
+        String key = "cpu-node:" + nodeName + ":parallelism";
+        String parallelismString = stringRedisTemplate.opsForValue().get(key);
+        if (StringUtil.isEmpty(parallelismString)) {
+            throw new RuntimeException("redisKey " + key + " 为空。");
+        }
+        final int parallelismBefore = Integer.parseInt(parallelismString);
+        final int parallelismAfter = parallelismBefore + 1;
+        stringRedisTemplate.opsForValue().set(key, parallelismAfter + "");
+        log.info("归还节点 " + nodeName + " 的 CPU 并行度:" + parallelismBefore + " --> " + parallelismAfter);
     }
 
 
@@ -508,13 +518,8 @@ public class ProjectUtil {
     }
 
     public void resetNodeParallelism() {
-        List<KubernetesNodeEntity> initialNodeList = kubernetesConfiguration.getNodeList();
-        List<String> podNameList = KubernetesUtil.getPod(apiClient, kubernetesConfiguration.getNamespace());
-        if (CollectionUtil.isEmpty(podNameList)) {
-            for (KubernetesNodeEntity kubernetesNodeEntity : initialNodeList) {
-                stringRedisTemplate.opsForValue().set("node:" + kubernetesNodeEntity.getName() + ":parallelism", kubernetesNodeEntity.getMaxParallelism() + "");
-            }
-        }
+        kubernetesConfiguration.getNodeList().forEach((node) -> customRedisClient.set("gpu-node:" + node.getName() + ":parallelism", node.getParallelism() + ""));
+        esminiConfiguration.getNodeList().forEach((node) -> customRedisClient.set("cpu-node:" + node.getName() + ":parallelism", node.getParallelism() + ""));
     }
 
     /**