Browse Source

节点分配

martin 2 năm trước cách đây
mục cha
commit
05fe0904bc

+ 1 - 1
api-common/src/main/java/api/common/pojo/dto/ProjectMessageDTO.java

@@ -13,7 +13,7 @@ import lombok.NoArgsConstructor;
  *  "scenePackageId": "sadfasdfs", // 场景包 id
  *  "maxSimulationTime": 11111, // 最大仿真时间
  *  "parallelism": 30  // 并行度
- *  "type": 30  // 并行度
+ *  "type": 30  // 项目类型
  * }
  */
 @Data

+ 8 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/configuration/kubernetes/KubernetesConfiguration.java

@@ -1,18 +1,26 @@
 package com.css.simulation.resource.scheduler.configuration.kubernetes;
 
+import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
 import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.util.ClientBuilder;
 import io.kubernetes.client.util.KubeConfig;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import java.util.List;
 
 @Configuration
+@ConfigurationProperties(prefix = "kubernetes")
+@Data
 public class KubernetesConfiguration {
 
+    private List<KubernetesNodeTO> nodeList;
+
     @Bean
     public ApiClient apiClient() throws IOException {
 //        File config = ResourceUtils.getFile("classpath:kubernetes/config");  // 开发环境可用,生产环境不行,无法从jar 包读取

+ 42 - 47
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -8,6 +8,7 @@ import api.common.util.JsonUtil;
 import api.common.util.StringUtil;
 import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.*;
+import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.service.ProjectService;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
@@ -16,13 +17,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -47,39 +47,25 @@ public class ProjectConsumer {
 
     // -------------------------------- Comment --------------------------------
 
-    @Autowired
-    KafkaTemplate<String, String> kafkaTemplate;
-    @Autowired
+    @Resource
     StringRedisTemplate redisTemplate;
-    @Autowired
+    @Resource
     ManualProjectMapper manualProjectMapper;
-    @Autowired
-    AutoProjectMapper autoProjectMapper;
-    @Autowired
+    @Resource
     AutoSubProjectMapper autoSubProjectMapper;
-    @Autowired
-    TaskMapper taskMapper;
-    @Autowired
-    IndexMapper indexMapper;
-    @Autowired
-    IndexTemplateMapper indexTemplateMapper;
-    @Autowired
-    SceneMapper sceneMapper;
-    @Autowired
+    @Resource
     VehicleMapper vehicleMapper;
-    @Autowired
+    @Resource
     SensorCameraMapper sensorCameraMapper;
-    @Autowired
+    @Resource
     SensorOgtMapper sensorOgtMapper;
-    @Autowired
-    AlgorithmMapper algorithmMapper;
-    @Autowired
+    @Resource
     UserMapper userMapper;
-    @Autowired
+    @Resource
     ClusterMapper clusterMapper;
-    @Autowired
+    @Resource
     ProjectService projectService;
-    @Autowired
+    @Resource
     ProjectUtil projectUtil;
 
 
@@ -96,7 +82,7 @@ public class ProjectConsumer {
         //1 读取 kafka 的 project 信息
         ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
         String projectId = projectMessageDTO.getProjectId();    // 手动执行项目 id 或 自动执行子项目 id
-        Long parallelism = projectMessageDTO.getParallelism();   // 项目并行度
+        long parallelism = projectMessageDTO.getParallelism();   // 项目并行度
         String projectType = projectMessageDTO.getType(); // 项目类型
         //2 根据 projectId 获取创建用户 id
         String userId;
@@ -119,7 +105,7 @@ public class ProjectConsumer {
         ClusterPO clusterPO;
         if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
             PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
-            run(DictConstants.SYSTEM_CLUSTER_ID, projectId, redisPrefix.getProjectRunningKey(), projectJson);
+            run(DictConstants.SYSTEM_CLUSTER_ID, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
             return;
         } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
             clusterPO = clusterMapper.selectByUserId(userId);
@@ -142,12 +128,12 @@ public class ProjectConsumer {
         Set<String> clusterRunningKeySet = redisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
         List<String> runningProjectSet;
         if (CollectionUtil.isEmpty(clusterRunningKeySet)) {
-            run(clusterId, projectId, redisPrefix.getProjectRunningKey(), projectJson);
+            run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
             return;
         }
         runningProjectSet = projectUtil.getRunningProjectList(clusterRunningKeySet);
         if (CollectionUtil.isEmpty(runningProjectSet)) {
-            run(clusterId, projectId, redisPrefix.getProjectRunningKey(), projectJson);
+            run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
             return;
         }
         // 计算正在运行的项目的并行度总和
@@ -159,46 +145,54 @@ public class ProjectConsumer {
         }
         // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
         if (parallelismSum + parallelism <= simulationLicenseNumber) {
-            run(clusterId, projectId, redisPrefix.getProjectRunningKey(), projectJson);
+            run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
         } else {
             wait(clusterId, projectId, redisPrefix.getProjectWaitingKey(), projectJson);
         }
     }
 
-    public void run(String clusterId, String projectId, String projectRunningKey, String projectJson) {
-        log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + " 执行!");
-        redisTemplate.opsForValue().set(projectRunningKey, projectJson);
-        parseProject(projectJson, "cluster:" + clusterId, projectRunningKey);
+    public void run(String clusterId, String projectId, String projectType, String projectRunningKey, String projectJson, long parallelism) {
+
+        //1 获取一个剩余可用并行度最大的节点
+        KubernetesNodeTO maxParallelismPNodeTO = projectUtil.getMaxParallelismPNode();
+        String maxRestParallelismNode = maxParallelismPNodeTO.getName();
+        Long maxRestParallelism = maxParallelismPNodeTO.getMaxParallelism();
+
+        //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
+        if (maxRestParallelism > parallelism) {
+            log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
+            parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, parallelism);
+        } else if (maxRestParallelism > 0) {
+            log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
+            parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, maxRestParallelism);
+        }
     }
 
     public void wait(String clusterId, String projectId, String projectWaitingKey, String projectJson) {
-        log.info("ProjectConsumer--cacheManualProject 集群 " + clusterId + " 将项目 " + projectId + " 放入等待队列!");
+        log.info("ProjectConsumer--wait 集群 " + clusterId + " 将项目 " + projectId + " 放入等待队列!");
         redisTemplate.opsForValue().set(projectWaitingKey, projectJson);
     }
 
 
     /**
-     * 开始执行以及重新执行
-     *
-     * @param projectJson 项目启动消息
+     * @param projectId
+     * @param projectJson
+     * @param clusterPrefix
+     * @param projectRunningPrefix projectRunningKey
+     * @param nodeName
+     * @param parallelism
      */
     @SneakyThrows
-    public void parseProject(String projectJson, String clusterPrefix, String projectRunningPrefix) {
-
+    public void parseProject(String projectId, String projectType, String projectJson, String clusterPrefix, String projectRunningPrefix, String nodeName, long parallelism) {
         // -------------------------------- 0 准备 --------------------------------
+        projectService.prepare(clusterPrefix, projectId, projectType, projectRunningPrefix, projectJson, nodeName, parallelism);
         log.info("ProjectConsumer--parseManualProject 接收到项目开始消息为:" + projectJson);
-        //1 读取 kafka 的 project 信息
         ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
-        String projectId = projectMessageDTO.getProjectId();    // 项目 id
-        String projectType = projectMessageDTO.getType();    // 项目 类型
         String packageId = projectMessageDTO.getScenePackageId();   // 场景测试包 id
         Long maxSimulationTime = projectMessageDTO.getMaxSimulationTime(); // 最大仿真时间,即生成视频的时间长度
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
         String userId = manualProjectMapper.selectCreateUserById(projectId);
-        Long parallelism = projectMessageDTO.getParallelism();    // 并行度
-        //2 执行前准备,删除改项目下所有任务,即重新执行改项目时需要新的测试包
-        projectService.prepare(clusterPrefix, projectId, projectJson);
         // -------------------------------- 1 查询场景 --------------------------------
         //1-1 根据场景测试包 packageId,拿到场景集合(不包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
         List<ScenePO> scenePOList = projectService.handlePackage(projectRunningPrefix, projectId, packageId);
@@ -214,6 +208,7 @@ public class ProjectConsumer {
         String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
         projectService.transferAndRunYaml(
+                nodeName,
                 jobTemplate + "job-template.yaml",
                 projectId,
                 algorithmDockerImage,

+ 19 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/AutoSubProjectMapper.java

@@ -1,16 +1,29 @@
 package com.css.simulation.resource.scheduler.mapper;
 
 
-import org.apache.ibatis.annotations.Mapper;
-import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
-import org.apache.ibatis.annotations.Update;
+import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
+import org.apache.ibatis.annotations.*;
+import org.apache.ibatis.type.JdbcType;
 
 import java.sql.Timestamp;
+import java.util.List;
 
 @Mapper
 public interface AutoSubProjectMapper {
 
+    @Results(id = "project", value = {
+            @Result(column = "id", property = "id", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "scene", property = "scenePackageId", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "create_user_id", property = "createUserId", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "parallelism", property = "parallelism", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "project_type", property = "projectType", jdbcType = JdbcType.VARCHAR)
+    })
+    @Select("select sas.id, sap.scene, sas.create_user_id, sap.parallelism, '2' project_type\n" +
+            "from simulation_automatic_subproject sas\n" +
+            "         left join simulation_automatic_project sap on sas.parent_id = sap.id\n" +
+            "where sas.is_deleted = '0'\n" +
+            "  and sas.now_run_state = #{nowRunState}")
+    List<ProjectPO> selectByNowRunState(@Param("nowRunState") String nowRunState);
 
     @Select("select create_user_id\n" +
             "from simulation_automatic_subproject\n" +
@@ -28,4 +41,6 @@ public interface AutoSubProjectMapper {
             "    finish_time = #{finishTime}\n" +
             "where id = #{id}")
     void updateNowRunStateAndFinishTimeById(@Param("nowRunState") String state, @Param("finishTime") Timestamp finishTime, @Param("id") String id);
+
+
 }

+ 4 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/ManualProjectMapper.java

@@ -15,7 +15,9 @@ public interface ManualProjectMapper {
     @Results(id = "project", value = {
             @Result(column = "id", property = "id", jdbcType = JdbcType.VARCHAR),
             @Result(column = "scene", property = "scenePackageId", jdbcType = JdbcType.VARCHAR),
-            @Result(column = "create_user_id", property = "createUserId", jdbcType = JdbcType.VARCHAR)
+            @Result(column = "create_user_id", property = "createUserId", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "parallelism", property = "parallelism", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "project_type", property = "projectType", jdbcType = JdbcType.VARCHAR)
     })
     @Select("select id, scene, create_user_id\n" +
             "from simulation_manual_project\n" +
@@ -23,7 +25,7 @@ public interface ManualProjectMapper {
     ProjectPO selectById(@Param("projectId") String projectId);
 
     @ResultMap("project")
-    @Select("select id, scene, create_user_id\n" +
+    @Select("select id, scene, create_user_id, parallelism, '1' project_type\n" +
             "from simulation_manual_project\n" +
             "where is_deleted = '0'\n" +
             "  and now_run_state = #{nowRunState}")

+ 3 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/po/ProjectPO.java

@@ -14,5 +14,8 @@ public class ProjectPO {
     private String id;
     private String scenePackageId;
     private String createUserId;
+    private String parallelism;
+    private String projectType;
+
 
 }

+ 15 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/to/KubernetesNodeTO.java

@@ -0,0 +1,15 @@
+package com.css.simulation.resource.scheduler.pojo.to;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class KubernetesNodeTO {
+    private String name;
+    private Long maxParallelism;
+}

+ 52 - 40
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -7,13 +7,11 @@ import api.common.util.JsonUtil;
 import api.common.util.StringUtil;
 import api.common.util.TimeUtil;
 import com.css.simulation.resource.scheduler.consumer.ProjectConsumer;
-import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
-import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
-import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-import com.css.simulation.resource.scheduler.mapper.UserMapper;
+import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.ClusterPO;
 import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
+import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.service.TaskService;
 import com.css.simulation.resource.scheduler.util.KubernetesUtil;
@@ -21,13 +19,14 @@ import com.css.simulation.resource.scheduler.util.ProjectUtil;
 import io.kubernetes.client.openapi.ApiClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
@@ -44,25 +43,27 @@ public class ProjectScheduler {
     @Value("${scheduler.linux-path.job-yaml}")
     String jobYaml;
     // -------------------------------- Comment --------------------------------
-    @Autowired
+    @Resource
     StringRedisTemplate redisTemplate;
-    @Autowired
+    @Resource
     TaskService taskService;
-    @Autowired
+    @Resource
     TaskMapper taskMapper;
-    @Autowired
+    @Resource
     ClusterMapper clusterMapper;
-    @Autowired
+    @Resource
     ManualProjectMapper manualProjectMapper;
-    @Autowired
+    @Resource
+    AutoSubProjectMapper autoSubProjectMapper;
+    @Resource
     ApiClient apiClient;
-    @Autowired
+    @Resource
     KafkaTemplate<String, String> kafkaTemplate;
-    @Autowired
+    @Resource
     ProjectConsumer projectConsumer;
-    @Autowired
+    @Resource
     UserMapper userMapper;
-    @Autowired
+    @Resource
     ProjectUtil projectUtil;
 
 
@@ -73,10 +74,18 @@ public class ProjectScheduler {
     @SneakyThrows
     public void dispatchProject() {
 
-        //1 查询已经排队的项目,即已经在页面上点击运行
-        List<ProjectPO> projectList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
-        for (ProjectPO project : projectList) {
+        //1 查询已经在页面上点击运行的项目(后面会判断是排队中还是已经与运行了)
+        List<ProjectPO> manualProjectList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
+        List<ProjectPO> autoSubProjectList = autoSubProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
+        List<ProjectPO> allProject = new ArrayList<>();
+        allProject.addAll(manualProjectList);
+        allProject.addAll(autoSubProjectList);
+
+
+        for (ProjectPO project : allProject) {
             String projectId = project.getId();
+            String projectType = project.getProjectType();
+            long parallelism = Long.parseLong(project.getParallelism());
             String userId = project.getCreateUserId();
             ClusterPO clusterPO = clusterMapper.selectByUserId(userId);
             if (clusterPO == null) {
@@ -85,55 +94,58 @@ public class ProjectScheduler {
             }
             String clusterId = clusterPO.getId();
             PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
+            // --------------------------------  判断项目是否已经在执行,如果执行则 continue --------------------------------
             if (StringUtil.isNotEmpty(redisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey()))) {
-                continue; // 判断项目是否已经在执行,如果执行则 continue
+                continue;
             }
+            // -------------------------------- 项目没有执行说明等待中 --------------------------------
             int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
             // 获取该用户正在运行的项目数量
             Set<String> clusterRunningKeySet = redisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
-            List<String> runningProjectSet;
+            List<String> runningProjectSet = null;
             // cluster:${clusterId}:running:${projectId}
             if (CollectionUtil.isNotEmpty(clusterRunningKeySet)) {
                 runningProjectSet = projectUtil.getRunningProjectList(clusterRunningKeySet);
                 if (CollectionUtil.isNotEmpty(runningProjectSet)) {
-                    log.info("ProjectScheduler--dispatchProject 运行中的项目的 key 有:"+runningProjectSet);
+                    log.info("ProjectScheduler--dispatchProject 运行中的项目的 key 有:" + runningProjectSet);
                     long parallelismSum = 0;
                     for (String runningProjectKey : runningProjectSet) {
                         parallelismSum += JsonUtil.jsonToBean(redisTemplate.opsForValue().get(runningProjectKey), ProjectMessageDTO.class).getParallelism();
                     }
                     if (parallelismSum < simulationLicenseNumber) {
-                        Set<String> waitingProjectSet = redisTemplate.keys(redisPrefix.getClusterWaitingPrefix() + "*");
-                        if (CollectionUtil.isEmpty(waitingProjectSet)) {
+                        if (parallelismSum + parallelism < simulationLicenseNumber) {
+                            run(clusterId, projectId, projectType, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
                             return;
                         }
-                        for (String waitingProjectKey : waitingProjectSet) {
-                            Long parallelism = JsonUtil.jsonToBean(redisTemplate.opsForValue().get(waitingProjectKey), ProjectMessageDTO.class).getParallelism();
-                            if (parallelismSum + parallelism < simulationLicenseNumber) {
-                                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
-                                return;
-                            }
-                        }
                     }
-                } else {
-                    run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
                 }
-            } else {
-                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
+            }
+            if ((CollectionUtil.isEmpty(clusterRunningKeySet) || CollectionUtil.isEmpty(runningProjectSet)) && parallelism < simulationLicenseNumber) {
+                run(clusterId, projectId, projectType, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
             }
         }
     }
 
-    public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey) {
-        String clusterPrefix = "cluster:" + clusterId;
+
+    public void run(String clusterId, String projectId, String projectType, String projectWaitingKey, String projectRunningKey, long parallelism) {
         String projectJson = redisTemplate.opsForValue().get(projectWaitingKey);
-        redisTemplate.delete(projectWaitingKey);
         if (StringUtil.isEmpty(projectJson)) {
             log.error("ProjectScheduler--run 项目 " + projectId + " 的开始消息查询失败,key 为:" + projectWaitingKey);
             return;
         }
-        redisTemplate.opsForValue().set(projectRunningKey, projectJson);
-        log.info("ProjectScheduler--run 项目 " + projectId + " 从等待队列进入执行状态!");
-        projectConsumer.parseProject(projectJson, clusterPrefix, projectRunningKey);
+        //1 获取一个剩余可用并行度最大的节点
+        KubernetesNodeTO maxParallelismPNodeTO = projectUtil.getMaxParallelismPNode();
+        String maxRestParallelismNode = maxParallelismPNodeTO.getName();
+        Long maxRestParallelism = maxParallelismPNodeTO.getMaxParallelism();
+
+        //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
+        if (maxRestParallelism > parallelism) {
+            log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
+            projectConsumer.parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, parallelism);
+        } else if (maxRestParallelism > 0) {
+            log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
+            projectConsumer.parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, maxRestParallelism);
+        }
     }
 
 

+ 14 - 29
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -18,8 +18,6 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.sshd.client.SshClient;
-import org.apache.sshd.client.session.ClientSession;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.core.KafkaTemplate;
@@ -28,10 +26,7 @@ import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Resource;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 
 @Service
@@ -82,8 +77,12 @@ public class ProjectService {
     // -------------------------------- Comment --------------------------------
 
     @Transactional
-    public void prepare(String clusterPrefix, String projectId, String projectType) {
-        //1 如果集群中有该项目旧的信息则直接删除
+    public void prepare(String clusterPrefix, String projectId, String projectType,String projectRunningKey,String projectJson, String nodeName, long parallelism) {
+        //1 将指定 node 的并行度减少
+        String restParallelismKey = "node:" + nodeName;
+        long restParallelism = Long.parseLong(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(restParallelismKey)));// 剩余可用并行度
+        stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism - parallelism) + "");
+        //2 将 redis 中该项目旧的信息则直接删除(包括 waitingKey)
         Set<String> oldKeys = stringRedisTemplate.keys(clusterPrefix + "*");
         if (CollectionUtil.isNotEmpty(oldKeys)) {
             for (String oldKey : oldKeys) {
@@ -92,14 +91,16 @@ public class ProjectService {
                 }
             }
         }
-        //2 将项目状态修改为执行中
+        //3 将项目信息放入 redis
+        stringRedisTemplate.opsForValue().set(projectRunningKey, projectJson);
+        //4 将项目状态修改为执行中
         if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
             manualProjectMapper.updateNowRunStateById(DictConstants.PROJECT_RUNNING, projectId);   // 修改该 project 的状态为执行中,同时将已完成任务重置为 0 。
         } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
             autoSubProjectMapper.updateNowRunStateById(DictConstants.PROJECT_RUNNING, projectId);
         }
 
-        //3 将该 project 下所有旧的任务和指标得分删除。
+        //5 将该 project 下所有旧的任务和指标得分删除。
         taskMapper.deleteByProject(projectId);
         indexMapper.deleteFirstTargetScoreByProjectId(projectId);
         indexMapper.deleteLastTargetScoreByProjectId(projectId);
@@ -350,7 +351,7 @@ public class ProjectService {
      * @param jobTemplateYamlPathTarget 执行文件
      */
     @SneakyThrows
-    public void transferAndRunYaml(String jobTemplateYamlPathSource, String projectId, String algorithmDockerImage, long completions, long parallelism, String jobTemplateYamlPathTarget) {
+    public void transferAndRunYaml(String nodeName, String jobTemplateYamlPathSource, String projectId, String algorithmDockerImage, long completions, long parallelism, String jobTemplateYamlPathTarget) {
         log.info("ProjectConsumer--transferYaml 项目 " + projectId + " 的完成度为:" + completions);
         log.info("ProjectConsumer--transferYaml 项目 " + projectId + " 的并行度为:" + parallelism);
         String yamlSource = FileUtil.read(jobTemplateYamlPathSource);
@@ -364,24 +365,8 @@ public class ProjectService {
         String replace6 = replace5.replace("parallelism-number", parallelism + "");
         String replace7 = replace6.replace("apiVers1on", "apiVersion");
         String replace8 = replace7.replace("1atch/v1", "batch/v1");
-        // 根据 kubernetes 的 node 分配 job
-        SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, hostname, username, password);
-        String nodeListString = SshUtil.execute(session, "kubectl get node");
-        String podListString = SshUtil.execute(session, "kubectl get pod -o wide");
-        String[] lineArray = nodeListString.split("\n");
-        String minNodeName = "master";
-        long minPodNumber = 9999L;
-        for (int i = 1; i < lineArray.length; i++) {
-            String[] attributeArray = StringUtil.splitByBlank(lineArray[i]);
-            String nodeName = attributeArray[0];
-            long podNumberOfNode = Arrays.stream(podListString.split("\n")).filter(string -> string.contains(nodeName)).count();
-            if (podNumberOfNode < minPodNumber) {
-                minNodeName = nodeName;
-            }
-        }
-        String finalYaml = replace8.replace("node-name", minNodeName);
-        log.info("ProjectConsumer--parseManualProject 在节点 " + minNodeName + " 开始执行 job:" + finalYaml);
+        String finalYaml = replace8.replace("node-name", nodeName);
+        log.info("ProjectConsumer--parseManualProject 在节点 " + nodeName + " 开始执行 job:" + finalYaml);
         FileUtil.writeStringToLocalFile(finalYaml, jobTemplateYamlPathTarget);
         //  启动
         KubernetesUtil.applyYaml(hostname, username, password, jobTemplateYamlPathTarget);

+ 37 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -2,17 +2,21 @@ package com.css.simulation.resource.scheduler.util;
 
 import api.common.pojo.constants.DictConstants;
 import api.common.util.StringUtil;
+import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
 import com.css.simulation.resource.scheduler.mapper.AutoSubProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
 import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.UserMapper;
 import com.css.simulation.resource.scheduler.pojo.po.UserPO;
+import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -34,6 +38,38 @@ public class ProjectUtil {
     ClusterMapper clusterMapper;
     @Autowired
     KafkaTemplate<String, String> kafkaTemplate;
+    @Resource
+    KubernetesConfiguration kubernetesConfiguration;
+    @Resource
+    StringRedisTemplate stringRedisTemplate;
+
+
+    public KubernetesNodeTO getMaxParallelismPNode() {
+        List<KubernetesNodeTO> nodeList = kubernetesConfiguration.getNodeList();
+        String maxRestParallelismNode = "master";
+        long maxRestParallelism = 0L;
+        for (KubernetesNodeTO kubernetesNodeTO : nodeList) {
+            String name = kubernetesNodeTO.getName();
+            Long maxParallelism = kubernetesNodeTO.getMaxParallelism();
+            String restParallelismKey = "node:" + name;
+            String restParallelismString = stringRedisTemplate.opsForValue().get(restParallelismKey);
+            long restParallelism;
+            if (restParallelismString == null) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大剩余可用并行度
+                restParallelism = maxParallelism;
+                stringRedisTemplate.opsForValue().set(restParallelismKey, restParallelism + "");
+            } else {
+                restParallelism = Long.parseLong(restParallelismString);
+            }
+            if (restParallelism > maxParallelism) {
+                maxRestParallelism = restParallelism;
+                maxRestParallelismNode = name;
+            }
+        }
+        return KubernetesNodeTO.builder()
+                .name(maxRestParallelismNode)
+                .maxParallelism(maxRestParallelism)
+                .build();
+    }
 
 
     public PrefixTO getRedisPrefixByUserIdAndProjectIdAndTaksId(String userId, String projectId, String taskId) {
@@ -189,4 +225,5 @@ public class ProjectUtil {
                 .projectCheckKey(projectCheckKey)
                 .build();
     }
+
 }

+ 1 - 1
simulation-resource-scheduler/src/main/resources/kubernetes/template/job/job-template.yaml

@@ -6,13 +6,13 @@ metadata:
   labels:
     user: EY
 spec:
-  nodeName: node-name
   completions: completions-number
   parallelism: parallelism-number
   template:
     metadata:
       name: pod-cloud-simulation
     spec:
+      nodeName: node-name
       hostAliases:
         - ip: 172.17.0.184
           hostnames: