LingxinMeng 2 лет назад
Родитель
Сommit
16a14161e4

+ 1 - 0
api-common/src/main/java/api/common/pojo/constants/DictConstants.java

@@ -8,6 +8,7 @@ public class DictConstants {
     public static final String PROJECT_WAIT_QUEUE_KEY = "project-wait-queue";  // 等待执行
     public static final String PROJECT_WAIT_TYPE_EXECUTE = "1";  // 等待执行
     public static final String PROJECT_WAIT_TYPE_EXPAND = "2";  // 等待扩充
+    public static final String PROJECT_WAIT_TYPE_ALL = "3";  // 等待扩充
     public static final String LICENSE_TYPE_SIMULATION = "1";
     public static final String LICENSE_TYPE_DYNAMIC = "2";
     public static final String MODEL_TYPE_VTD = "1";

+ 4 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/application/entity/ProjectWaitQueueEntity.java

@@ -1,13 +1,13 @@
 package com.css.simulation.resource.scheduler.application.entity;
 
 import com.css.simulation.resource.scheduler.api.model.ProjectMessageModel;
-import lombok.Builder;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
+import lombok.*;
 
-@Data
 @EqualsAndHashCode
+@Data
 @Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class ProjectWaitQueueEntity {
     private String waitingType; //1等待执行 2等待扩充
     private Integer waitingParallelism; // 等待扩充或执行的并行度

+ 12 - 5
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/application/service/ProjectService.java

@@ -549,7 +549,7 @@ public class ProjectService {
                 remainderNodeMap.put(currentNodeName, cpuOrder);
             }
             // 只有准备启动(即 currentCount == 0)的时候才指定 cpu 编号
-            log.info("创建任务 " + taskId + " 的 yaml:是否使用 gpu (0是1否)" + isChoiceGpu + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前节点名称为:" + currentNodeName + ",当前 cpu 编号为:" + cpuOrder);
+            log.debug("创建任务 " + taskId + " 的 yaml:是否使用 gpu (0是1否)" + isChoiceGpu + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前节点名称为:" + currentNodeName + ",当前 cpu 编号为:" + cpuOrder);
             String yamlRedisKey = projectDomainService.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder);
             if (currentCount == 0) {
                 yamlToRunRedisKeyList.add(yamlRedisKey);
@@ -557,11 +557,11 @@ public class ProjectService {
             messageNumber++;
         }
         TimeUnit.SECONDS.sleep(10);
-        log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
+        log.debug("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
         for (String redisKey : yamlToRunRedisKeyList) {
             projectDomainService.createPodBegin(projectId, redisKey);
         }
-        log.info("项目 " + projectId + " 已经启动 " + yamlToRunRedisKeyList);
+        log.debug("项目 " + projectId + " 已经启动 " + yamlToRunRedisKeyList);
         // 项目启动之后删除等待队列中的该项目
         projectDomainService.removeWaitQueue(DictConstants.PROJECT_WAIT_TYPE_EXECUTE, projectId);
 
@@ -572,17 +572,21 @@ public class ProjectService {
         log.info("扩充项目 {} {} 个并行度", projectId, expandParallelism);
         //1 获取剩余并行度和即将使用的各node的并行度
         Map<String, Integer> remainderNodeMap = projectDomainService.getRemainderNodeMap(isChoiceGpu);
+        log.info("剩余并行度为:" + remainderNodeMap);
         Map<String, Integer> nodeMapToUse = projectDomainService.getNodeMapToUse(isChoiceGpu, expandParallelism);
+        log.info("即将使用的并行度为:" + nodeMapToUse);
         //2 将指定 node 的并行度减少
         nodeMapToUse.keySet().forEach(nodeName -> projectDomainService.decrementParallelism(isChoiceGpu, nodeName, nodeMapToUse.get(nodeName)));
         //3 获取还未运行的任务 ("project:" + projectId + ":node:" + nodeName + ":yaml")
         final Set<String> yamlPathCacheKeySet = customRedisClient.getKeySetByPrefixAndContent(stringRedisTemplate, "project:" + projectId + ":node", "yaml");
+        log.info("项目 {} 还未运行的 yaml 在缓存中的 key 有 {}", projectId, yamlPathCacheKeySet);
         if (CollectionUtil.isNotEmpty(yamlPathCacheKeySet)) {
             // 根据节点名分组
             final Map<String, List<String>> yamlPathCacheKeyMapGroupByNodeName = yamlPathCacheKeySet.stream().collect(Collectors.groupingBy(key -> {
                 final String[] split = key.split(":");
                 return split[3];
             }));
+            log.info("yaml缓存key根据节点分组之后为:" + yamlPathCacheKeyMapGroupByNodeName);
             // 每个节点分出一部分给两个节点
             yamlPathCacheKeyMapGroupByNodeName.forEach((nodeNameBefore, yamlPathCacheKeySetGroupByNodeName) -> {
                 final int yamlCount = yamlPathCacheKeySetGroupByNodeName.size();
@@ -608,6 +612,7 @@ public class ProjectService {
                             final String replace = read.replace("cpu-order", "\"" + cpuOrderString + "\"");
                             FileUtil.writeStringToLocalFile(replace, yamlPath);
                             // 创建 pod
+                            log.info("扩充项目{}的一个并行度成功。", projectId);
                             projectDomainService.createPod(projectId, yamlPathCacheKeyAfter, cpuOrderString);
                         }
                     }
@@ -678,7 +683,7 @@ public class ProjectService {
         String algorithmDirectoryLinuxTempPath;
         String algorithmTarLinuxTempPath = null;
         if (algorithmEntity != null) {
-            log.info("项目" + projectId + "需要使用仿真平台自己的算法 " + algorithmEntity);
+            log.debug("项目" + projectId + "使用仿真平台自己的算法 " + algorithmEntity);
             String algorithmCode = algorithmEntity.getAlgorithmCode();
             String dockerImport = algorithmEntity.getDockerImport();
             dockerImage = dockerConfiguration.getRegistry() + "/algorithm_" + algorithmCode + ":latest";
@@ -718,7 +723,7 @@ public class ProjectService {
                 throw new RuntimeException("算法 " + algorithmId + " 的 mysql 数据有误!");
             }
         } else {
-            log.info("项目" + projectId + "需要使用索为平台算法 " + algorithmId);
+            log.debug("项目" + projectId + "使用索为平台算法 " + algorithmId);
             algorithmTarLinuxTempPath = linuxTempPath + "algorithm/" + algorithmId + ".tar";
             String dockerImageWithoutVersion = dockerConfiguration.getRegistry() + "/algorithm_" + algorithmId;
             dockerImage = dockerImageWithoutVersion + ":latest";
@@ -754,6 +759,8 @@ public class ProjectService {
      */
     @SneakyThrows
     public void stopProject(String projectType, String projectId) {
+        // 删除等待队列中的项目
+        projectDomainService.removeWaitQueue(DictConstants.PROJECT_WAIT_TYPE_ALL, projectId);
         String isChoiceGpu = projectDomainService.getIsChoiceGpuByProjectId(projectId);
         //* -------------------------------- Comment --------------------------------
         ProjectEntity projectEntity = projectDomainService.getProjectByProjectId(projectId);

+ 8 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/domain/service/ProjectDomainService.java

@@ -519,7 +519,6 @@ public class ProjectDomainService {
                     restNodeList.add(kubernetesNodeCopy);
                 }
             }
-            log.info("集群剩余并行度为:" + restNodeList);
             return restNodeList.size() == 0 ? 0 : restNodeList.stream().mapToInt(NodeEntity::getParallelism).sum();
         } else if (DictConstants.USE_CPU.equals(isChoiceGpu)) {
             initialNodeList = kubernetesConfiguration.getCpuNodeList(); // 预设并行度的节点列表
@@ -543,7 +542,7 @@ public class ProjectDomainService {
                     restNodeList.add(kubernetesNodeCopy);
                 }
             }
-            log.info("集群剩余并行度为:" + restNodeList);
+            log.debug("集群剩余并行度为:" + restNodeList);
             return restNodeList.size() == 0 ? 0 : restNodeList.stream().mapToInt(NodeEntity::getParallelism).sum();
         } else {
             throw new RuntimeException("未知是否使用 GPU:" + isChoiceGpu);
@@ -933,6 +932,13 @@ public class ProjectDomainService {
                         break;
                     }
                 }
+            } else if (DictConstants.PROJECT_WAIT_TYPE_ALL.equals(waitType)) {
+                for (ProjectWaitQueueEntity projectWaitQueueEntity : waitQueue) {
+                    if (projectWaitQueueEntity.getProjectMessageModel().getProjectId().equals(projectId)) {
+                        waitQueue.remove(projectWaitQueueEntity);
+                        break;
+                    }
+                }
             } else {
                 throw new RuntimeException("未知等待类型:" + waitType);
             }

+ 25 - 31
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infrastructure/scheduler/AlgorithmScheduler.java

@@ -21,14 +21,13 @@ import java.util.List;
 public class AlgorithmScheduler {
 
     @Resource
-   private DockerConfiguration dockerConfiguration;
+    private DockerConfiguration dockerConfiguration;
     @Resource
-    private  AlgorithmMapper algorithmMapper;
+    private AlgorithmMapper algorithmMapper;
 
 
     /**
      * 定时删除算法镜像
-     * TODO 算法平台暂时手动删除
      */
     @Scheduled(fixedDelay = 60 * 60 * 1000)
     @SneakyThrows
@@ -38,34 +37,29 @@ public class AlgorithmScheduler {
         //1 查询已经导入的算法,根据创建时间排序
         List<AlgorithmEntity> algorithmImportedList = algorithmMapper.selectByDockerImport("1");
         int algorithmImportNumber = algorithmImportedList.size();
-        log.info("kubernetes 各个节点已经导入 " + algorithmImportNumber + " 个算法。");
-        if (algorithmImportNumber < maxAlgorithmImage) {
-            return;
-        }
-        //2 如果已经导入到 docker 中的算法镜像已经达到 maxAlgorithmImage 个,则清理算法镜像到 minAlgorithmImage 个;
-        List<String> algorithmIdToUpdateList = new ArrayList<>();
-        StringBuilder dockerRmiCommand = new StringBuilder("docker rmi ");
-        StringBuilder registryRmCommand = new StringBuilder("rm -rf ");
-        for (int i = minAlgorithmImage; i < algorithmImportedList.size(); i++) {
-            AlgorithmEntity tempAlgorithm = algorithmImportedList.get(i);
-            String tempAlgorithmId = tempAlgorithm.getId();
-            String algorithmCode = tempAlgorithm.getAlgorithmCode();
-            algorithmIdToUpdateList.add(tempAlgorithm.getId());
-            dockerRmiCommand.append(tempAlgorithm.getDockerImage());
-            registryRmCommand.append(tempAlgorithmId).append("_").append(algorithmCode);
-        }
-        //3 修改数据库为未导入
-        algorithmMapper.updateDockerImportAndDockerImageByIdList("0", "", algorithmIdToUpdateList);
-        //4 删除镜像
-        List<DockerNodeEntity> nodeList = dockerConfiguration.getNodeList();
-        for (DockerNodeEntity dockerNodeEntity : nodeList) {
-            String hostname = dockerNodeEntity.getHostname();
-            String username = dockerNodeEntity.getUsername();
-            String password = dockerNodeEntity.getPassword();
-            SshClient client = SshUtil.getClient();
-            ClientSession session = SshUtil.getSession(client, hostname, username, password);
-            SshUtil.execute(session, dockerRmiCommand.toString());
-            SshUtil.stop(client, session);
+        if (algorithmImportNumber >= maxAlgorithmImage) {
+            log.info("kubernetes 各个节点已经导入 " + algorithmImportNumber + " 个算法,上限为 30,已超出。");
+            //2 如果已经导入到 docker 中的算法镜像已经达到 maxAlgorithmImage 个,则清理算法镜像到 minAlgorithmImage 个;
+            List<String> algorithmIdToUpdateList = new ArrayList<>();
+            StringBuilder dockerRmiCommand = new StringBuilder("docker rmi ");
+            for (int i = minAlgorithmImage; i < algorithmImportedList.size(); i++) {
+                AlgorithmEntity tempAlgorithm = algorithmImportedList.get(i);
+                algorithmIdToUpdateList.add(tempAlgorithm.getId());
+                dockerRmiCommand.append(tempAlgorithm.getDockerImage());
+            }
+            //3 修改数据库为未导入
+            algorithmMapper.updateDockerImportAndDockerImageByIdList("0", "", algorithmIdToUpdateList);
+            //4 删除镜像
+            List<DockerNodeEntity> nodeList = dockerConfiguration.getNodeList();
+            for (DockerNodeEntity dockerNodeEntity : nodeList) {
+                String hostname = dockerNodeEntity.getHostname();
+                String username = dockerNodeEntity.getUsername();
+                String password = dockerNodeEntity.getPassword();
+                SshClient client = SshUtil.getClient();
+                ClientSession session = SshUtil.getSession(client, hostname, username, password);
+                SshUtil.execute(session, dockerRmiCommand.toString());
+                SshUtil.stop(client, session);
+            }
         }
     }
 }

+ 3 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infrastructure/scheduler/ProjectScheduler.java

@@ -25,17 +25,17 @@ public class ProjectScheduler {
     /**
      * 调度项目启动
      */
-    @Scheduled(fixedDelay = 1000)
+    @Scheduled(fixedDelay = 10000)
     public void dispatchProject() {
         List<ProjectWaitQueueEntity> projectWaitQueue = projectDomainService.getWaitQueue();
         if (CollectionUtil.isNotEmpty(projectWaitQueue)) {
             ProjectWaitQueueEntity projectWaitQueueEntity = projectWaitQueue.get(0);
             String waitingType = projectWaitQueueEntity.getWaitingType();
             if (DictConstants.PROJECT_WAIT_TYPE_EXECUTE.equals(waitingType)) {
-//                log.info("尝试启动等待中的项目:{}", projectWaitQueueEntity);
+                log.debug("尝试启动等待中的项目:{}", projectWaitQueueEntity);
                 projectService.checkIfCanRun(projectWaitQueueEntity);
             } else if (DictConstants.PROJECT_WAIT_TYPE_EXPAND.equals(waitingType)) {
-                log.info("尝试扩充运行中的项目:{}", projectWaitQueueEntity);
+                log.debug("尝试扩充运行中的项目:{}", projectWaitQueueEntity);
                 ProjectMessageModel projectMessageModel = projectWaitQueueEntity.getProjectMessageModel();
                 String projectId = projectMessageModel.getProjectId();
                 ProjectEntity project = projectDomainService.getProjectByProjectId(projectId);