瀏覽代碼

标准化测试扩充

LingxinMeng 2 年之前
父節點
當前提交
f6c861364c

+ 4 - 3
api-common/src/main/java/api/common/util/FileUtil.java

@@ -136,12 +136,13 @@ public class FileUtil {
         return read(getFile(path));
         return read(getFile(path));
     }
     }
 
 
-    public static String cat(String path) throws Exception {
+    @SneakyThrows
+    public static String cat(String path) {
         return read(getFile(path));
         return read(getFile(path));
     }
     }
 
 
     @SneakyThrows
     @SneakyThrows
-    public static String read(File file) throws IOException {
+    public static String read(File file) {
         return read(Files.newInputStream(file.toPath()));
         return read(Files.newInputStream(file.toPath()));
     }
     }
 
 
@@ -1044,7 +1045,7 @@ public class FileUtil {
     public static void writeInputStreamToLocalFile(InputStream inputStream, String filePath, int bufferLength) {
     public static void writeInputStreamToLocalFile(InputStream inputStream, String filePath, int bufferLength) {
         try {
         try {
             final File file1 = new File(filePath);
             final File file1 = new File(filePath);
-            if(file1.exists()){
+            if (file1.exists()) {
                 file1.delete();
                 file1.delete();
             }
             }
             BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
             BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/api/controller/advice/AllExceptionHandlerAdvice.java → simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/api/advice/AllExceptionHandlerAdvice.java

@@ -1,4 +1,4 @@
-package com.css.simulation.resource.scheduler.api.controller.advice;
+package com.css.simulation.resource.scheduler.api.advice;
 
 
 import api.common.pojo.common.ResponseBodyVO;
 import api.common.pojo.common.ResponseBodyVO;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;

+ 76 - 16
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/application/service/ProjectService.java

@@ -400,26 +400,33 @@ public class ProjectService {
                 throw new RuntimeException("未知模型类型:" + modelType);
                 throw new RuntimeException("未知模型类型:" + modelType);
             }
             }
         } else if (DictConstants.PROJECT_WAIT_TYPE_EXPAND.equals(waitingType)) {
         } else if (DictConstants.PROJECT_WAIT_TYPE_EXPAND.equals(waitingType)) {
-            //TODO
             if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
             if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
+                int expandParallelism;
                 if (remainderSimulationLicense <= 0 || remainderParallelism <= 0) {
                 if (remainderSimulationLicense <= 0 || remainderParallelism <= 0) {
-                    wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectMessageModel(projectMessageModel).build());
-                } else if (remainderSimulationLicense < parallelism || remainderParallelism <= parallelism) {
-                    wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(parallelism - Math.min(remainderSimulationLicense, remainderParallelism)).projectMessageModel(projectMessageModel).build());
-                    run(clusterUserId, Math.min(remainderSimulationLicense, remainderParallelism), projectMessageModel, projectRunningKey, isChoiceGpu);
+                    log.debug("无可用证书或并行度。");
                 } else {
                 } else {
-                    run(clusterUserId, parallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
+                    if (remainderSimulationLicense < waitingParallelism || remainderParallelism <= waitingParallelism) {
+                        expandParallelism = Math.min(remainderSimulationLicense, remainderParallelism);
+                        wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(waitingParallelism - expandParallelism).projectMessageModel(projectMessageModel).build());
+                    } else {
+                        expandParallelism = waitingParallelism;
+                    }
+                    expand(projectId, isChoiceGpu, parallelism, expandParallelism);
                 }
                 }
             } else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
             } else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
                 usingDynamicLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_DYNAMIC);
                 usingDynamicLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_DYNAMIC);
                 remainderDynamicLicense = numDynamicLicense - usingDynamicLicenseNumber;
                 remainderDynamicLicense = numDynamicLicense - usingDynamicLicenseNumber;
+                int expandParallelism;
                 if (remainderSimulationLicense <= 0 || remainderDynamicLicense <= 0 || remainderParallelism <= 0) {
                 if (remainderSimulationLicense <= 0 || remainderDynamicLicense <= 0 || remainderParallelism <= 0) {
-                    wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectMessageModel(projectMessageModel).build());
-                } else if (remainderSimulationLicense < parallelism || remainderDynamicLicense < parallelism || remainderParallelism < parallelism) {
-                    wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(parallelism - Math.min(Math.min(remainderSimulationLicense, remainderDynamicLicense), remainderParallelism)).projectMessageModel(projectMessageModel).build());
-                    run(clusterUserId, Math.min(remainderSimulationLicense, Math.min(remainderDynamicLicense, remainderParallelism)), projectMessageModel, projectRunningKey, isChoiceGpu);
+                    log.debug("无可用证书或并行度。");
                 } else {
                 } else {
-                    run(clusterUserId, parallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
+                    if (remainderSimulationLicense < waitingParallelism || remainderDynamicLicense < waitingParallelism || remainderParallelism < waitingParallelism) {
+                        expandParallelism = Math.min(Math.min(remainderSimulationLicense, remainderDynamicLicense), remainderParallelism);
+                        wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(waitingParallelism - expandParallelism).projectMessageModel(projectMessageModel).build());
+                    } else {
+                        expandParallelism = waitingParallelism;
+                    }
+                    expand(projectId, isChoiceGpu, parallelism, expandParallelism);
                 }
                 }
             } else {
             } else {
                 throw new RuntimeException("未知模型类型:" + modelType);
                 throw new RuntimeException("未知模型类型:" + modelType);
@@ -445,7 +452,14 @@ public class ProjectService {
             } else {
             } else {
                 waitingQueue = JsonUtil.jsonToList(waitingQueueJson, ProjectWaitQueueEntity.class);
                 waitingQueue = JsonUtil.jsonToList(waitingQueueJson, ProjectWaitQueueEntity.class);
             }
             }
-            if (!waitingQueue.contains(projectWaitQueueEntity)) {
+            boolean contains = false;
+            for (ProjectWaitQueueEntity waitQueueEntity : waitingQueue) {
+                if (waitQueueEntity.getProjectMessageModel().getProjectId().equals(projectWaitQueueEntity.getProjectMessageModel().getProjectId())) {
+                    contains = true;
+                    waitQueueEntity.setWaitingParallelism(projectWaitQueueEntity.getWaitingParallelism());
+                }
+            }
+            if (!contains) {
                 waitingQueue.add(projectWaitQueueEntity);
                 waitingQueue.add(projectWaitQueueEntity);
             }
             }
             String newWaitingQueueJson = JsonUtil.listToJson(waitingQueue);
             String newWaitingQueueJson = JsonUtil.listToJson(waitingQueue);
@@ -494,7 +508,6 @@ public class ProjectService {
         TimeUnit.SECONDS.sleep(7);
         TimeUnit.SECONDS.sleep(7);
         // 需要即时启动的任务(并行度的大小)
         // 需要即时启动的任务(并行度的大小)
         ArrayList<String> yamlToRunRedisKeyList = new ArrayList<>();
         ArrayList<String> yamlToRunRedisKeyList = new ArrayList<>();
-        ArrayList<String> yamlToWaitRedisKeyList = new ArrayList<>();
         for (String taskJsonPath : taskJsonList) {
         for (String taskJsonPath : taskJsonList) {
             String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
             String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
             // 保存运行中的任务信息
             // 保存运行中的任务信息
@@ -541,12 +554,9 @@ public class ProjectService {
             String yamlRedisKey = projectDomainService.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder);
             String yamlRedisKey = projectDomainService.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder);
             if (currentCount == 0) {
             if (currentCount == 0) {
                 yamlToRunRedisKeyList.add(yamlRedisKey);
                 yamlToRunRedisKeyList.add(yamlRedisKey);
-            } else {
-                yamlToWaitRedisKeyList.add(yamlRedisKey);
             }
             }
             messageNumber++;
             messageNumber++;
         }
         }
-        customRedisClient.set("project:" + projectId + ":waiting-queue", JsonUtil.listToJson(yamlToWaitRedisKeyList));
         TimeUnit.SECONDS.sleep(6);
         TimeUnit.SECONDS.sleep(6);
         log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
         log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
         for (String redisKey : yamlToRunRedisKeyList) {
         for (String redisKey : yamlToRunRedisKeyList) {
@@ -558,6 +568,56 @@ public class ProjectService {
 
 
     }
     }
 
 
+
+    @SneakyThrows
+    public void expand(String projectId, String isChoiceGpu, int totalParallelism, int expandParallelism) {
+        //1 获取剩余并行度和即将使用的各node的并行度
+        Map<String, Integer> remainderNodeMap = projectDomainService.getRemainderNodeMap(isChoiceGpu);
+        Map<String, Integer> nodeMapToUse = projectDomainService.getNodeMapToUse(isChoiceGpu, expandParallelism);
+        //2 将指定 node 的并行度减少
+        nodeMapToUse.keySet().forEach(nodeName -> projectDomainService.decrementParallelism(isChoiceGpu, nodeName, nodeMapToUse.get(nodeName)));
+        //3 获取还未运行的任务 ("project:" + projectId + ":node:" + nodeName + ":yaml")
+        final Set<String> yamlPathCacheKeySet = customRedisClient.getKeySetByPrefixAndContent(stringRedisTemplate, "project:" + projectId + ":node", "yaml");
+        if (CollectionUtil.isNotEmpty(yamlPathCacheKeySet)) {
+            // 根据节点名分组
+            final Map<String, List<String>> yamlPathCacheKeyMapGroupByNodeName = yamlPathCacheKeySet.stream().collect(Collectors.groupingBy(key -> {
+                final String[] split = key.split(":");
+                return split[3];
+            }));
+            // 每个节点分出一部分给两个节点
+            yamlPathCacheKeyMapGroupByNodeName.forEach((nodeNameBefore, yamlPathCacheKeySetGroupByNodeName) -> {
+                final int yamlCount = yamlPathCacheKeySetGroupByNodeName.size();
+                // 修改全部yaml
+                nodeMapToUse.forEach((nodeNameAfter, parallelismToUse) -> {
+                    int shareNum = yamlCount / totalParallelism * parallelismToUse;
+                    for (int i = 0; i < shareNum; i++) {
+                        final String yamlPathCacheKeyBefore = yamlPathCacheKeySetGroupByNodeName.get(i);
+                        final String yamlPathCacheKeyAfter = yamlPathCacheKeyBefore.replace(nodeNameBefore, nodeNameAfter);
+                        final String yamlPath = customRedisClient.get(yamlPathCacheKeyBefore);
+                        final String yamlContentBefore = FileUtil.read(new File(yamlPath));
+                        final String yamlContentAfter = yamlContentBefore.replace(nodeNameBefore, nodeNameAfter);
+                        final boolean rm = FileUtil.rm(yamlPath);
+                        FileUtil.writeStringToLocalFile(yamlContentAfter, yamlPath);
+                        customRedisClient.delete(yamlPathCacheKeyBefore);
+                        customRedisClient.set(yamlPathCacheKeyAfter, yamlPath);
+                        if (i < parallelismToUse) {
+                            // 修改 cpu 编号
+                            // 根据各节点剩余并行度,倒序获取 cpu 编号
+                            int cpuOrderAfter = remainderNodeMap.get(nodeNameAfter) - 1 - i;
+                            String cpuOrderString = String.valueOf(cpuOrderAfter);
+                            final String read = FileUtil.read(yamlPath);
+                            final String replace = read.replace("cpu-order", "\"" + cpuOrderString + "\"");
+                            FileUtil.writeStringToLocalFile(replace, yamlPath);
+                            // 创建 pod
+                            projectDomainService.createPod(projectId, yamlPathCacheKeyAfter, cpuOrderString);
+                        }
+                    }
+                });
+            });
+        }
+    }
+
+
     //* -------------------------------- 逻辑 --------------------------------
     //* -------------------------------- 逻辑 --------------------------------
 
 
 
 

+ 0 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/application/service/TaskService.java

@@ -51,8 +51,6 @@ public class TaskService {
     private CustomConfiguration customConfiguration;
     private CustomConfiguration customConfiguration;
     @Resource
     @Resource
     private TaskDomainService taskDomainService;
     private TaskDomainService taskDomainService;
-    @Resource
-    private ProjectService projectService;
 
 
 
 
     // -------------------------------- Comment --------------------------------
     // -------------------------------- Comment --------------------------------

+ 13 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/domain/service/ProjectDomainService.java

@@ -33,6 +33,7 @@ import javax.annotation.Resource;
 import java.io.File;
 import java.io.File;
 import java.util.*;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 
 /**
 /**
  * 任务结果打分
  * 任务结果打分
@@ -172,14 +173,13 @@ public class ProjectDomainService {
         }
         }
         log.info("保存项目 " + projectId + " 的 yaml 文件:" + yamlPath);
         log.info("保存项目 " + projectId + " 的 yaml 文件:" + yamlPath);
         FileUtil.writeStringToLocalFile(finalYaml, yamlPath);
         FileUtil.writeStringToLocalFile(finalYaml, yamlPath);
+        // 保存 yaml 地址
         String yamlRedisKey = "project:" + projectId + ":node:" + nodeName + ":yaml:" + podName;
         String yamlRedisKey = "project:" + projectId + ":node:" + nodeName + ":yaml:" + podName;
         stringRedisTemplate.opsForValue().set(yamlRedisKey, yamlPath);
         stringRedisTemplate.opsForValue().set(yamlRedisKey, yamlPath);
         return yamlRedisKey;
         return yamlRedisKey;
     }
     }
 
 
 
 
-
-
     public String getIsChoiceGpuByProjectId(String projectId) {
     public String getIsChoiceGpuByProjectId(String projectId) {
         return getProjectByProjectId(projectId).getIsChoiceGpu();
         return getProjectByProjectId(projectId).getIsChoiceGpu();
     }
     }
@@ -311,7 +311,7 @@ public class ProjectDomainService {
         if (podYamlPath == null) {
         if (podYamlPath == null) {
             throw new RuntimeException("根据缓存 key 获取 yaml 地址为 null:" + redisKey);
             throw new RuntimeException("根据缓存 key 获取 yaml 地址为 null:" + redisKey);
         }
         }
-        stringRedisTemplate.delete(redisKey);
+        stringRedisTemplate.delete(redisKey); // 在缓存中删除该yaml,因为已经开始运行。
         String nodeName = new File(podYamlPath).getName().split("#")[0];
         String nodeName = new File(podYamlPath).getName().split("#")[0];
         String podName = podYamlPath.split("#")[1].split("\\.")[0];
         String podName = podYamlPath.split("#")[1].split("\\.")[0];
         stringRedisTemplate.opsForValue().set("project:" + projectId + ":pod:" + podName + ":node", nodeName);    // 将 pod 运行在哪个 node 上记录到 redis
         stringRedisTemplate.opsForValue().set("project:" + projectId + ":pod:" + podName + ":node", nodeName);    // 将 pod 运行在哪个 node 上记录到 redis
@@ -926,4 +926,14 @@ public class ProjectDomainService {
         }
         }
         setWaitQueue(waitQueue);
         setWaitQueue(waitQueue);
     }
     }
+
+
+    public static Set<String> getCacheKeySetByPrefixAndContent(StringRedisTemplate stringRedisTemplate, String prefix) {
+        final Set<String> keys = stringRedisTemplate.keys(prefix + "*");
+        if (CollectionUtil.isNotEmpty(keys)) {
+            return keys.stream().filter(key -> key.contains("yaml")).collect(Collectors.toSet());
+        } else {
+            return new HashSet<>();
+        }
+    }
 }
 }

+ 12 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infrastructure/persistence/redis/CustomRedisClient.java

@@ -1,5 +1,6 @@
 package com.css.simulation.resource.scheduler.infrastructure.persistence.redis;
 package com.css.simulation.resource.scheduler.infrastructure.persistence.redis;
 
 
+import api.common.util.CollectionUtil;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.data.redis.core.StringRedisTemplate;
@@ -7,8 +8,10 @@ import org.springframework.stereotype.Component;
 
 
 import javax.annotation.Resource;
 import javax.annotation.Resource;
 import java.time.Duration;
 import java.time.Duration;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 
 @Component
 @Component
 @Slf4j
 @Slf4j
@@ -17,6 +20,15 @@ public class CustomRedisClient {
     @Resource
     @Resource
     private StringRedisTemplate stringRedisTemplate;
     private StringRedisTemplate stringRedisTemplate;
 
 
+    public  Set<String> getKeySetByPrefixAndContent(StringRedisTemplate stringRedisTemplate, String prefix,String content) {
+        final Set<String> keys = stringRedisTemplate.keys(prefix + "*");
+        if (CollectionUtil.isNotEmpty(keys)) {
+            return keys.stream().filter(key -> key.contains(content)).collect(Collectors.toSet());
+        } else {
+            return new HashSet<>();
+        }
+    }
+
     //* -------------------------------- 基本操作 --------------------------------
     //* -------------------------------- 基本操作 --------------------------------
 
 
     public Set<String> keys(String pattern) {
     public Set<String> keys(String pattern) {

+ 3 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infrastructure/persistence/redis/RedisUtil.java

@@ -27,4 +27,7 @@ public class RedisUtil {
         return stringRedisTemplate.keys(prefix + "*");
         return stringRedisTemplate.keys(prefix + "*");
     }
     }
 
 
+
+
+
 }
 }