Browse Source

标准化测试

LingxinMeng 2 years ago
parent
commit
89d4eb2b66

+ 1 - 0
api-common/src/main/java/api/common/pojo/constants/DictConstants.java

@@ -3,6 +3,7 @@ package api.common.pojo.constants;
 @SuppressWarnings("all")
 public class DictConstants {
 
+    public static final String PROJECT_WAIT_QUEUE_KEY = "project-wait-queue";  // 等待执行
     public static final String PROJECT_WAIT_TYPE_EXECUTE = "1";  // 等待执行
     public static final String PROJECT_WAIT_TYPE_EXPAND = "2";  // 等待扩充
     public static final String LICENSE_TYPE_SIMULATION = "1";

+ 2 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/api/model/ProjectMessageModel.java

@@ -1,9 +1,6 @@
 package com.css.simulation.resource.scheduler.api.model;
 
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.*;
 
 /**
  * {
@@ -20,6 +17,7 @@ import lombok.NoArgsConstructor;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
+@EqualsAndHashCode
 public class ProjectMessageModel {
 
     private String projectId;// 项目 id

+ 12 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/application/entity/ProjectWaitQueueEntity.java

@@ -0,0 +1,12 @@
+package com.css.simulation.resource.scheduler.application.entity;
+
+import com.css.simulation.resource.scheduler.api.model.ProjectMessageModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode
+public class ProjectWaitQueueEntity {
+    private String waitingType; //1等待执行 2等待扩充
+    private ProjectMessageModel projectMessageModel; // 项目详情
+}

+ 0 - 9
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/application/entity/ProjectWaitingQueueEntity.java

@@ -1,9 +0,0 @@
-package com.css.simulation.resource.scheduler.application.entity;
-
-import lombok.Data;
-
-@Data
-public class ProjectWaitingQueueEntity {
-    private String projectId;
-    private String waitingType; //1等待执行 2等待扩充
-}

+ 56 - 58
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/application/service/ProjectService.java

@@ -4,6 +4,7 @@ import api.common.pojo.constants.DictConstants;
 import api.common.pojo.po.scheduler.SchedulerProjectPO;
 import api.common.util.*;
 import com.css.simulation.resource.scheduler.api.model.ProjectMessageModel;
+import com.css.simulation.resource.scheduler.application.entity.ProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.application.entity.VehicleEntity;
 import com.css.simulation.resource.scheduler.domain.service.ProjectDomainService;
 import com.css.simulation.resource.scheduler.domain.service.TaskDomainService;
@@ -20,6 +21,7 @@ import com.css.simulation.resource.scheduler.infrastructure.persistence.minio.Mi
 import com.css.simulation.resource.scheduler.infrastructure.persistence.mysql.mapper.*;
 import com.css.simulation.resource.scheduler.infrastructure.persistence.redis.CustomRedisClient;
 import com.css.simulation.resource.scheduler.infrastructure.persistence.redis.RedisUtil;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
@@ -327,6 +329,8 @@ public class ProjectService {
         String projectId = projectMessageModel.getProjectId();    // 手动执行项目 id 或 自动执行子项目 id
         long parallelism = projectMessageModel.getParallelism();   // 项目并行度
         String projectType = projectMessageModel.getType(); // 项目类型
+        final String isChoiceGpu = projectDomainService.getIsChoiceGpuByProjectId(projectId);
+        final int remainderParallelism = projectDomainService.getRemainderParallelism(isChoiceGpu);
         //2 获取用户信息(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
         final UserEntity userEntity = projectDomainService.getUserEntityByProjectIdAndProjectType(projectId, projectType);
         String projectUserId = userEntity.getId();
@@ -336,10 +340,18 @@ public class ProjectService {
         ClusterEntity clusterEntity;
         String clusterUserId;  // 项目实际运行使用的用户集群
         if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
-            clusterUserId = DictConstants.SYSTEM_CLUSTER_ID;
+            clusterUserId = DictConstants.SYSTEM_USER_ID;
             log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
             PrefixEntity redisPrefix = projectDomainService.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
-            run(projectMessageModel, clusterUserId, modelType, DictConstants.SYSTEM_USER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
+            final String projectRunningKey = redisPrefix.getProjectRunningKey();
+            if (remainderParallelism <= 0) {
+                wait(DictConstants.PROJECT_WAIT_TYPE_EXECUTE, projectMessageModel);
+            } else if (remainderParallelism < parallelism) {
+                wait(DictConstants.PROJECT_WAIT_TYPE_EXPAND, projectMessageModel);
+                run(clusterUserId, remainderParallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
+            } else {
+                run(clusterUserId, (int) parallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
+            }
             return;
         } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
             clusterUserId = projectUserId;
@@ -366,8 +378,6 @@ public class ProjectService {
         final Integer usingDynamicLicenseNumber;
         final Integer numSimulationLicense = clusterEntity.getNumSimulationLicense();
         final Integer numDynamicLicense = clusterEntity.getNumDynamicLicense();
-        final String clusterId = clusterEntity.getId();
-        final String projectWaitingKey = redisPrefix.getProjectWaitingKey();
         final String projectRunningKey = redisPrefix.getProjectRunningKey();
         //1 判断仿真证书是否够用,如果证书为0则将项目加入等待队列;如果证书小于并行度则加入扩充队列,并用现有证书执行;如果证书够用,直接执行。
         final int remainderSimulationLicense = numSimulationLicense - usingSimulationLicenseNumber;
@@ -375,22 +385,24 @@ public class ProjectService {
 
         // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
         if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
-            if (remainderSimulationLicense <= 0) {
-                wait(DictConstants.PROJECT_WAIT_TYPE_EXECUTE, projectWaitingKey, projectMessageModel);
-            } else if (remainderSimulationLicense < parallelism) {
-                wait(DictConstants.PROJECT_WAIT_TYPE_EXPAND, projectWaitingKey, projectMessageModel);
+            if (remainderSimulationLicense <= 0 || remainderParallelism <= 0) {
+                wait(DictConstants.PROJECT_WAIT_TYPE_EXECUTE, projectMessageModel);
+            } else if (remainderSimulationLicense < parallelism || remainderParallelism <= parallelism) {
+                wait(DictConstants.PROJECT_WAIT_TYPE_EXPAND, projectMessageModel);
+                run(clusterUserId, Math.min(remainderSimulationLicense, remainderParallelism), projectMessageModel, projectRunningKey, isChoiceGpu);
             } else {
-                run(projectMessageModel, clusterUserId, modelType, clusterId, projectRunningKey, projectWaitingKey);
+                run(clusterUserId, (int) parallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
             }
         } else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
             usingDynamicLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_DYNAMIC);
             remainderDynamicLicense = numDynamicLicense - usingDynamicLicenseNumber;
-            if (remainderSimulationLicense <= 0 || remainderDynamicLicense <= 0) {
-                wait(DictConstants.PROJECT_WAIT_TYPE_EXECUTE, projectWaitingKey, projectMessageModel);
-            } else if (remainderSimulationLicense < parallelism || remainderDynamicLicense < parallelism) {
-                wait(DictConstants.PROJECT_WAIT_TYPE_EXPAND, projectWaitingKey, projectMessageModel);
+            if (remainderSimulationLicense <= 0 || remainderDynamicLicense <= 0 || remainderParallelism <= 0) {
+                wait(DictConstants.PROJECT_WAIT_TYPE_EXECUTE, projectMessageModel);
+            } else if (remainderSimulationLicense < parallelism || remainderDynamicLicense < parallelism || remainderParallelism < parallelism) {
+                wait(DictConstants.PROJECT_WAIT_TYPE_EXPAND, projectMessageModel);
+                run(clusterUserId, Math.min(remainderSimulationLicense, Math.min(remainderDynamicLicense, remainderParallelism)), projectMessageModel, projectRunningKey, isChoiceGpu);
             } else {
-                run(projectMessageModel, clusterUserId, modelType, clusterId, projectRunningKey, projectWaitingKey);
+                run(clusterUserId, (int) parallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
             }
         } else {
             throw new RuntimeException("未知模型类型:" + modelType);
@@ -401,50 +413,33 @@ public class ProjectService {
 
     /**
      * @param waitType            等待类型 1等待执行 2等待扩充
-     * @param projectWaitingKey   项目等待 key
      * @param projectMessageModel 项目信息
      */
-    public void wait(String waitType, String projectWaitingKey, ProjectMessageModel projectMessageModel) {
-        //1 获取 redis 中的等待列表
-
-        if (DictConstants.PROJECT_WAIT_TYPE_EXECUTE.equals(waitType)) {
-
-        } else if (DictConstants.PROJECT_WAIT_TYPE_EXPAND.equals(waitType)) {
-
-        }
-        customRedisClient.set(projectWaitingKey, JsonUtil.beanToJson(projectMessageModel));
-    }
-
-
-    //* -------------------------------- 运行 --------------------------------
-
-    /**
-     * @param projectMessageModel 初始接收到的项目启动信息
-     * @param clusterId           集群ID
-     * @param projectRunningKey   projectRunningKey
-     * @param projectWaitingKey   projectWaitingKey
-     */
-    public void run(ProjectMessageModel projectMessageModel, String clusterUserId, String modelType, String clusterId, String projectRunningKey, String projectWaitingKey) {
-        String projectId = projectMessageModel.getProjectId();    // 项目 id
-        String isChoiceGpu = projectDomainService.getIsChoiceGpuByProjectId(projectId);
-        int parallelism = projectMessageModel.getParallelism();  // 期望并行度
-        //1 获取集群剩余可用并行度
-        int restParallelism = projectDomainService.getRestParallelism(isChoiceGpu);
-        //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
-        if (restParallelism > 0L) {
-            log.info("集群 " + clusterId + " 执行项目 " + projectId);
-            if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)) {
-                projectDomainService.useLicense(clusterUserId, modelType, parallelism);
+    public void wait(String waitType, ProjectMessageModel projectMessageModel) {
+        try {
+            //1 创建等待队列元素对象
+            final ProjectWaitQueueEntity projectWaitQueueEntity = new ProjectWaitQueueEntity();
+            projectWaitQueueEntity.setWaitingType(waitType);
+            projectWaitQueueEntity.setProjectMessageModel(projectMessageModel);
+            //2 创建等待列表对象
+            final String waitingQueueJson = customRedisClient.get(DictConstants.PROJECT_WAIT_QUEUE_KEY);
+            List<ProjectWaitQueueEntity> waitingQueue;
+            if (StringUtil.isEmpty(waitingQueueJson)) {
+                waitingQueue = new ArrayList<>();
+            } else {
+                waitingQueue = JsonUtil.jsonToList(waitingQueueJson, ProjectWaitQueueEntity.class);
             }
-            // 设置实际的并行度
-            projectMessageModel.setCurrentParallelism(Math.min(restParallelism, parallelism));   // 设置实际的并行度
-            parseProject(projectMessageModel, projectRunningKey, isChoiceGpu);
-        } else {
-            log.info("服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
-            wait(DictConstants.PROJECT_WAIT_TYPE_EXECUTE, projectWaitingKey, projectMessageModel);
+            if (!waitingQueue.contains(projectWaitQueueEntity)) {
+                waitingQueue.add(projectWaitQueueEntity);
+            }
+            String newWaitingQueueJson = JsonUtil.listToJson(waitingQueue);
+            customRedisClient.set(DictConstants.PROJECT_WAIT_QUEUE_KEY, newWaitingQueueJson);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
         }
     }
 
+
     /**
      * 运行项目
      *
@@ -452,9 +447,13 @@ public class ProjectService {
      * @param projectRunningKey   projectRunningKey
      */
     @SneakyThrows
-    public void parseProject(ProjectMessageModel projectMessageModel, String projectRunningKey, String isChoiceGpu) {
-        String projectId = projectMessageModel.getProjectId();    // 项目 id
+    public void run(String clusterUserId, int finalParallelism, ProjectMessageModel projectMessageModel, String projectRunningKey, String isChoiceGpu) {
+        Map<String, Integer> remainderNodeMap = projectDomainService.getRemainderNodeMap(isChoiceGpu);
         String modelType = projectMessageModel.getModelType();
+        if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)) {
+            projectDomainService.useLicense(clusterUserId, modelType, finalParallelism);
+        }
+        String projectId = projectMessageModel.getProjectId();    // 项目 id
         String vehicleConfigId = projectMessageModel.getVehicleConfigId();
         int currentParallelism = projectMessageModel.getCurrentParallelism();   // 当前并行度
         String algorithmId = projectMessageModel.getAlgorithmId();    // 算法 id
@@ -466,8 +465,7 @@ public class ProjectService {
         projectMessageModel.setTaskCompleted(0);
         // 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
         //1 获取剩余并行度和即将使用的各node的并行度
-        Map<String, Integer> nodeMap0 = projectDomainService.getNodeMap(isChoiceGpu);
-        Map<String, Integer> nodeMap = projectDomainService.getNodeMapToUse(isChoiceGpu, Math.min(currentParallelism, taskTotal));
+        Map<String, Integer> nodeMap = projectDomainService.getNodeMapToUse(isChoiceGpu, finalParallelism);
         //2 将指定 node 的并行度减少
         nodeMap.keySet().forEach(nodeName -> projectDomainService.decrementParallelism(isChoiceGpu, nodeName, nodeMap.get(nodeName)));
         // 重新设置实际使用的并行度并保存到 redis
@@ -522,8 +520,8 @@ public class ProjectService {
             Integer cpuOrder = null;
             if (currentCount == 0) {
                 // 根据各节点剩余并行度,倒序获取 cpu 编号
-                cpuOrder = nodeMap0.get(currentNodeName) - 1;
-                nodeMap0.put(currentNodeName, cpuOrder);
+                cpuOrder = remainderNodeMap.get(currentNodeName) - 1;
+                remainderNodeMap.put(currentNodeName, cpuOrder);
             }
             // 只有准备启动(即 currentCount == 0)的时候才指定 cpu 编号
             log.info("创建任务 " + taskId + " 的 yaml:是否使用 gpu (0是1否)" + isChoiceGpu + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前节点名称为:" + currentNodeName + ",当前 cpu 编号为:" + cpuOrder);

+ 18 - 13
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/domain/service/ProjectDomainService.java

@@ -4,19 +4,21 @@ import api.common.pojo.common.ResponseBodyVO;
 import api.common.pojo.constants.DictConstants;
 import api.common.util.*;
 import com.css.simulation.resource.scheduler.api.model.ProjectMessageModel;
+import com.css.simulation.resource.scheduler.application.entity.ProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.infrastructure.configuration.custom.CustomConfiguration;
 import com.css.simulation.resource.scheduler.infrastructure.configuration.entity.NodeEntity;
 import com.css.simulation.resource.scheduler.infrastructure.configuration.kubernetes.KubernetesConfiguration;
 import com.css.simulation.resource.scheduler.infrastructure.entity.PrefixEntity;
 import com.css.simulation.resource.scheduler.infrastructure.entity.ProjectEntity;
 import com.css.simulation.resource.scheduler.infrastructure.entity.UserEntity;
+import com.css.simulation.resource.scheduler.infrastructure.persistence.kubernetes.KubernetesUtil;
 import com.css.simulation.resource.scheduler.infrastructure.persistence.mysql.mapper.AutoSubProjectMapper;
 import com.css.simulation.resource.scheduler.infrastructure.persistence.mysql.mapper.ClusterMapper;
 import com.css.simulation.resource.scheduler.infrastructure.persistence.mysql.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.infrastructure.persistence.mysql.mapper.UserMapper;
 import com.css.simulation.resource.scheduler.infrastructure.persistence.redis.CustomRedisClient;
-import com.css.simulation.resource.scheduler.infrastructure.persistence.kubernetes.KubernetesUtil;
 import com.css.simulation.resource.scheduler.infrastructure.persistence.redis.RedisUtil;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.kubernetes.client.openapi.ApiClient;
@@ -33,7 +35,6 @@ import javax.annotation.Resource;
 import java.io.File;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 /**
  * 任务结果打分
@@ -219,7 +220,7 @@ public class ProjectDomainService {
      *
      * @return 节点映射(节点名,并行度)
      */
-    public Map<String, Integer> getNodeMap(String isChoiceGpu) {
+    public Map<String, Integer> getRemainderNodeMap(String isChoiceGpu) {
         Map<String, Integer> resultNodeMap = new HashMap<>();    // 用于执行的节点映射(节点名,并行度)
         if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
             List<NodeEntity> initialNodeList = kubernetesConfiguration.getGpuNodeList();
@@ -379,7 +380,7 @@ public class ProjectDomainService {
      *
      * @return 集群剩余并行度
      */
-    public int getRestParallelism(String isChoiceGpu) {
+    public int getRemainderParallelism(String isChoiceGpu) {
         List<NodeEntity> initialNodeList; // 预设并行度的节点列表
         if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
             initialNodeList = kubernetesConfiguration.getGpuNodeList(); // 预设并行度的节点列表
@@ -572,15 +573,6 @@ public class ProjectDomainService {
     }
 
 
-    public List<String> getWaitingProjectMessageKeys() {
-        final Set<String> keys = stringRedisTemplate.keys("*");
-        if (CollectionUtil.isEmpty(keys)) {
-            return new ArrayList<>();
-        } else {
-            return keys.stream().filter(key -> key.contains("cluster") && key.contains("waiting")).collect(Collectors.toList());
-        }
-    }
-
     public void resetNodeParallelism() {
         //* -------------------------------- GPU节点 --------------------------------
         final List<NodeEntity> gpuNodeList = kubernetesConfiguration.getGpuNodeList();
@@ -784,4 +776,17 @@ public class ProjectDomainService {
             throw new RuntimeException("未知项目类型:" + projectType);
         }
     }
+
+    public List<ProjectWaitQueueEntity> getWaitQueue() {
+        try {
+            final String waitQueueJson = customRedisClient.get(DictConstants.PROJECT_WAIT_QUEUE_KEY);
+            if (StringUtil.isNotEmpty(waitQueueJson)){
+                return JsonUtil.jsonToList(waitQueueJson, ProjectWaitQueueEntity.class);
+            }else{
+                return null;
+            }
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }

+ 13 - 12
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infrastructure/scheduler/ProjectScheduler.java

@@ -1,13 +1,12 @@
 package com.css.simulation.resource.scheduler.infrastructure.scheduler;
 
-import com.css.simulation.resource.scheduler.api.model.ProjectMessageModel;
+import api.common.pojo.constants.DictConstants;
 import api.common.util.CollectionUtil;
-import api.common.util.JsonUtil;
+import com.css.simulation.resource.scheduler.api.model.ProjectMessageModel;
+import com.css.simulation.resource.scheduler.application.entity.ProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.application.service.ProjectService;
 import com.css.simulation.resource.scheduler.domain.service.ProjectDomainService;
-import com.css.simulation.resource.scheduler.infrastructure.persistence.redis.RedisUtil;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
@@ -17,8 +16,6 @@ import java.util.List;
 @Component
 @Slf4j
 public class ProjectScheduler {
-    @Resource
-    private StringRedisTemplate stringRedisTemplate;
     @Resource
     private ProjectService projectService;
     @Resource
@@ -29,12 +26,16 @@ public class ProjectScheduler {
      */
     @Scheduled(fixedDelay = 5000)
     public void dispatchProject() {
-        List<String> projectMessageKeys = projectDomainService.getWaitingProjectMessageKeys();
-        if (!CollectionUtil.isEmpty(projectMessageKeys)) {
-            log.info("尝试启动等待中的项目:" + projectMessageKeys);
-            for (String projectMessageKey : projectMessageKeys) {
-                final String projectMessage = RedisUtil.getStringByKey(stringRedisTemplate, projectMessageKey);
-                projectService.checkIfCanRun(JsonUtil.jsonToBean(projectMessage, ProjectMessageModel.class));
+
+        List<ProjectWaitQueueEntity> projectWaitQueue = projectDomainService.getWaitQueue();
+        if (CollectionUtil.isNotEmpty(projectWaitQueue)) {
+            log.info("尝试启动等待中的项目:{}", projectWaitQueue);
+            for (ProjectWaitQueueEntity projectWaitQueueEntity : projectWaitQueue) {
+                final String waitingType = projectWaitQueueEntity.getWaitingType();
+                final ProjectMessageModel projectMessageModel = projectWaitQueueEntity.getProjectMessageModel();
+                if (DictConstants.PROJECT_WAIT_TYPE_EXECUTE.equals(waitingType)) {
+                    projectService.checkIfCanRun(projectMessageModel);
+                }
             }
         }
     }