Sfoglia il codice sorgente

标准化测试扩充

LingxinMeng 2 anni fa
parent
commit
c741f583d3

+ 10 - 20
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/application/service/ProjectService.java

@@ -1,7 +1,6 @@
 package com.css.simulation.resource.scheduler.application.service;
 
 import api.common.pojo.constants.DictConstants;
-import api.common.pojo.po.scheduler.SchedulerProjectPO;
 import api.common.util.*;
 import com.css.simulation.resource.scheduler.api.model.ProjectMessageModel;
 import com.css.simulation.resource.scheduler.application.entity.ProjectWaitQueueEntity;
@@ -411,7 +410,7 @@ public class ProjectService {
                     } else {
                         expandParallelism = waitingParallelism;
                     }
-                    expand(projectId, isChoiceGpu, parallelism, expandParallelism);
+                    expand(projectId, isChoiceGpu, parallelism, expandParallelism, expandParallelism == waitingParallelism);
                 }
             } else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
                 usingDynamicLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_DYNAMIC);
@@ -426,7 +425,7 @@ public class ProjectService {
                     } else {
                         expandParallelism = waitingParallelism;
                     }
-                    expand(projectId, isChoiceGpu, parallelism, expandParallelism);
+                    expand(projectId, isChoiceGpu, parallelism, expandParallelism, expandParallelism == waitingParallelism);
                 }
             } else {
                 throw new RuntimeException("未知模型类型:" + modelType);
@@ -505,7 +504,7 @@ public class ProjectService {
         List<NodeEntity> nodeListToCount = projectDomainService.getNodeListToCount(nodeMap);
         int messageNumber = 0;
         KafkaUtil.createTopic(kafkaAdminClient, projectId, finalParallelism, (short) 1);   // 创建主题
-        TimeUnit.SECONDS.sleep(7);
+        TimeUnit.SECONDS.sleep(10);
         // 需要即时启动的任务(并行度的大小)
         ArrayList<String> yamlToRunRedisKeyList = new ArrayList<>();
         for (String taskJsonPath : taskJsonList) {
@@ -557,20 +556,19 @@ public class ProjectService {
             }
             messageNumber++;
         }
-        TimeUnit.SECONDS.sleep(6);
+        TimeUnit.SECONDS.sleep(10);
         log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
         for (String redisKey : yamlToRunRedisKeyList) {
             projectDomainService.createPodBegin(projectId, redisKey);
         }
         log.info("项目 " + projectId + " 已经启动 " + yamlToRunRedisKeyList);
         // 项目启动之后删除等待队列中的该项目
-        projectDomainService.removeWaitQueueOfExecute(projectId);
+        projectDomainService.removeWaitQueue(DictConstants.PROJECT_WAIT_TYPE_EXECUTE, projectId);
 
     }
 
 
-    @SneakyThrows
-    public void expand(String projectId, String isChoiceGpu, int totalParallelism, int expandParallelism) {
+    public void expand(String projectId, String isChoiceGpu, int totalParallelism, int expandParallelism, boolean isDone) {
         //1 获取剩余并行度和即将使用的各node的并行度
         Map<String, Integer> remainderNodeMap = projectDomainService.getRemainderNodeMap(isChoiceGpu);
         Map<String, Integer> nodeMapToUse = projectDomainService.getNodeMapToUse(isChoiceGpu, expandParallelism);
@@ -596,7 +594,7 @@ public class ProjectService {
                         final String yamlPath = customRedisClient.get(yamlPathCacheKeyBefore);
                         final String yamlContentBefore = FileUtil.read(new File(yamlPath));
                         final String yamlContentAfter = yamlContentBefore.replace(nodeNameBefore, nodeNameAfter);
-                        final boolean rm = FileUtil.rm(yamlPath);
+                        FileUtil.rm(yamlPath);
                         FileUtil.writeStringToLocalFile(yamlContentAfter, yamlPath);
                         customRedisClient.delete(yamlPathCacheKeyBefore);
                         customRedisClient.set(yamlPathCacheKeyAfter, yamlPath);
@@ -615,6 +613,9 @@ public class ProjectService {
                 });
             });
         }
+        if (isDone) {
+            projectDomainService.removeWaitQueue(DictConstants.PROJECT_WAIT_TYPE_EXPAND, projectId);
+        }
     }
 
 
@@ -746,17 +747,6 @@ public class ProjectService {
         return dockerImage;
     }
 
-    public void stopProject(String projectType, String projectId, String errorMessage) {
-        Optional.ofNullable(errorMessage).ifPresent(em -> {
-            if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-                manualProjectMapper.saveErrorMessage(SchedulerProjectPO.builder().id(projectId).errorMessage(em).modifyUserId(DictConstants.SCHEDULER_USER_ID).modifyTime(TimeUtil.getNowForMysql()).build());
-            } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-                autoSubProjectMapper.saveErrorMessage(SchedulerProjectPO.builder().id(projectId).errorMessage(em).modifyUserId(DictConstants.SCHEDULER_USER_ID).modifyTime(TimeUtil.getNowForMysql()).build());
-            }
-        });
-        stopProject(projectType, projectId);
-    }
-
     /**
      * @param projectId   手动项目 id 或自动项目子id
      * @param projectType 项目类型

+ 21 - 16
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/domain/service/ProjectDomainService.java

@@ -33,7 +33,6 @@ import javax.annotation.Resource;
 import java.io.File;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 /**
  * 任务结果打分
@@ -916,24 +915,30 @@ public class ProjectDomainService {
         }
     }
 
-    public void removeWaitQueueOfExecute(String projectId) {
-        List<ProjectWaitQueueEntity> waitQueue = getWaitQueue();
-        for (ProjectWaitQueueEntity projectWaitQueueEntity : waitQueue) {
-            if (projectWaitQueueEntity.getProjectMessageModel().getProjectId().equals(projectId) && projectWaitQueueEntity.getWaitingType().equals(DictConstants.PROJECT_WAIT_TYPE_EXECUTE)) {
-                waitQueue.remove(projectWaitQueueEntity);
-                break;
+    public void removeWaitQueue(String waitType, String projectId) {
+        if (DictConstants.PROJECT_WAIT_TYPE_EXECUTE.equals(waitType)) {
+            List<ProjectWaitQueueEntity> waitQueue = getWaitQueue();
+            for (ProjectWaitQueueEntity projectWaitQueueEntity : waitQueue) {
+                if (projectWaitQueueEntity.getProjectMessageModel().getProjectId().equals(projectId) && projectWaitQueueEntity.getWaitingType().equals(DictConstants.PROJECT_WAIT_TYPE_EXECUTE)) {
+                    waitQueue.remove(projectWaitQueueEntity);
+                    break;
+                }
+            }
+            setWaitQueue(waitQueue);
+        } else if (DictConstants.PROJECT_WAIT_TYPE_EXPAND.equals(waitType)) {
+            List<ProjectWaitQueueEntity> waitQueue = getWaitQueue();
+            for (ProjectWaitQueueEntity projectWaitQueueEntity : waitQueue) {
+                if (projectWaitQueueEntity.getProjectMessageModel().getProjectId().equals(projectId) && projectWaitQueueEntity.getWaitingType().equals(DictConstants.PROJECT_WAIT_TYPE_EXPAND)) {
+                    waitQueue.remove(projectWaitQueueEntity);
+                    break;
+                }
             }
+            setWaitQueue(waitQueue);
+        } else {
+            throw new RuntimeException("未知等待类型:" + waitType);
         }
-        setWaitQueue(waitQueue);
+
     }
 
 
-    public static Set<String> getCacheKeySetByPrefixAndContent(StringRedisTemplate stringRedisTemplate, String prefix) {
-        final Set<String> keys = stringRedisTemplate.keys(prefix + "*");
-        if (CollectionUtil.isNotEmpty(keys)) {
-            return keys.stream().filter(key -> key.contains("yaml")).collect(Collectors.toSet());
-        } else {
-            return new HashSet<>();
-        }
-    }
 }