Jelajahi Sumber

Merge remote-tracking branch 'origin/master'

wangzhiqiang 2 tahun lalu
induk
melakukan
d1dcc9cf21
19 mengubah file dengan 268 tambahan dan 279 penghapusan
  1. 1 1
      api-common/src/main/java/api/common/pojo/dto/ProjectMessageDTO.java
  2. 3 0
      api-common/src/main/java/api/common/util/StringUtil.java
  3. 1 1
      simulation-oauth-server/src/main/resources/logback-spring.xml
  4. 8 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/configuration/kubernetes/KubernetesConfiguration.java
  5. 46 51
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java
  6. 3 1
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/TaskController.java
  7. 2 2
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskIndexManager.java
  8. 12 21
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java
  9. 19 9
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/AutoSubProjectMapper.java
  10. 4 2
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/ManualProjectMapper.java
  11. 2 2
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/VehicleMapper.java
  12. 3 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/po/ProjectPO.java
  13. 15 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/to/KubernetesNodeTO.java
  14. 66 116
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java
  15. 3 2
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/AlgorithmService.java
  16. 32 37
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java
  17. 10 34
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java
  18. 37 0
      simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java
  19. 1 0
      simulation-resource-scheduler/src/main/resources/kubernetes/template/job/job-template.yaml

+ 1 - 1
api-common/src/main/java/api/common/pojo/dto/ProjectMessageDTO.java

@@ -13,7 +13,7 @@ import lombok.NoArgsConstructor;
  *  "scenePackageId": "sadfasdfs", // 场景包 id
  *  "maxSimulationTime": 11111, // 最大仿真时间
  *  "parallelism": 30  // 并行度
- *  "type": 30  // 并行度
+ *  "type": 30  // 项目类型
  * }
  */
 @Data

+ 3 - 0
api-common/src/main/java/api/common/util/StringUtil.java

@@ -5,6 +5,9 @@ import java.util.UUID;
 
 public class StringUtil {
 
+    public static String[] splitByBlank(String string) {
+        return string.split("\\s+");
+    }
 
     public static String getRandomUUID() {
         return UUID.randomUUID().toString().replace("-", "");

+ 1 - 1
simulation-oauth-server/src/main/resources/logback-spring.xml

@@ -11,7 +11,7 @@
         <property name="LOG_HOME" value="/opt/simulation-cloud/simulation-oauth-server/log"/>
     </springProfile>
     <springProfile name="test">
-        <property name="LOG_HOME" value="/opt/simulation-cloud/simulation-oauth-server/log"/>
+        <property name="LOG_HOME" value="/opt/simulation-cloud/simulation-oauth-server-test/log"/>
     </springProfile>
     <!--输出到控制台-->
     <appender name="console" class="ch.qos.logback.core.ConsoleAppender">

+ 8 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/configuration/kubernetes/KubernetesConfiguration.java

@@ -1,18 +1,26 @@
 package com.css.simulation.resource.scheduler.configuration.kubernetes;
 
+import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
 import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.util.ClientBuilder;
 import io.kubernetes.client.util.KubeConfig;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import java.util.List;
 
 @Configuration
+@ConfigurationProperties(prefix = "kubernetes")
+@Data
 public class KubernetesConfiguration {
 
+    private List<KubernetesNodeTO> nodeList;
+
     @Bean
     public ApiClient apiClient() throws IOException {
 //        File config = ResourceUtils.getFile("classpath:kubernetes/config");  // 开发环境可用,生产环境不行,无法从jar 包读取

+ 46 - 51
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -8,6 +8,7 @@ import api.common.util.JsonUtil;
 import api.common.util.StringUtil;
 import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.*;
+import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.service.ProjectService;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
@@ -16,13 +17,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -47,39 +47,25 @@ public class ProjectConsumer {
 
     // -------------------------------- Comment --------------------------------
 
-    @Autowired
-    KafkaTemplate<String, String> kafkaTemplate;
-    @Autowired
-    StringRedisTemplate redisTemplate;
-    @Autowired
+    @Resource
+    StringRedisTemplate stringRedisTemplate;
+    @Resource
     ManualProjectMapper manualProjectMapper;
-    @Autowired
-    AutoProjectMapper autoProjectMapper;
-    @Autowired
+    @Resource
     AutoSubProjectMapper autoSubProjectMapper;
-    @Autowired
-    TaskMapper taskMapper;
-    @Autowired
-    IndexMapper indexMapper;
-    @Autowired
-    IndexTemplateMapper indexTemplateMapper;
-    @Autowired
-    SceneMapper sceneMapper;
-    @Autowired
+    @Resource
     VehicleMapper vehicleMapper;
-    @Autowired
+    @Resource
     SensorCameraMapper sensorCameraMapper;
-    @Autowired
+    @Resource
     SensorOgtMapper sensorOgtMapper;
-    @Autowired
-    AlgorithmMapper algorithmMapper;
-    @Autowired
+    @Resource
     UserMapper userMapper;
-    @Autowired
+    @Resource
     ClusterMapper clusterMapper;
-    @Autowired
+    @Resource
     ProjectService projectService;
-    @Autowired
+    @Resource
     ProjectUtil projectUtil;
 
 
@@ -96,7 +82,7 @@ public class ProjectConsumer {
         //1 读取 kafka 的 project 信息
         ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
         String projectId = projectMessageDTO.getProjectId();    // 手动执行项目 id 或 自动执行子项目 id
-        Long parallelism = projectMessageDTO.getParallelism();   // 项目并行度
+        long parallelism = projectMessageDTO.getParallelism();   // 项目并行度
         String projectType = projectMessageDTO.getType(); // 项目类型
         //2 根据 projectId 获取创建用户 id
         String userId;
@@ -119,7 +105,7 @@ public class ProjectConsumer {
         ClusterPO clusterPO;
         if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
             PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
-            run(DictConstants.SYSTEM_CLUSTER_ID, projectId, redisPrefix.getProjectRunningKey(), projectJson);
+            run(DictConstants.SYSTEM_CLUSTER_ID, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
             return;
         } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
             clusterPO = clusterMapper.selectByUserId(userId);
@@ -139,66 +125,74 @@ public class ProjectConsumer {
         int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
         // 获取该集群中正在运行的项目,如果没有则立即执行
         PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
-        Set<String> clusterRunningKeySet = redisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
+        Set<String> clusterRunningKeySet = stringRedisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
         List<String> runningProjectSet;
         if (CollectionUtil.isEmpty(clusterRunningKeySet)) {
-            run(clusterId, projectId, redisPrefix.getProjectRunningKey(), projectJson);
+            run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
             return;
         }
         runningProjectSet = projectUtil.getRunningProjectList(clusterRunningKeySet);
         if (CollectionUtil.isEmpty(runningProjectSet)) {
-            run(clusterId, projectId, redisPrefix.getProjectRunningKey(), projectJson);
+            run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
             return;
         }
         // 计算正在运行的项目的并行度总和
         long parallelismSum = 0;
         for (String projectKey : runningProjectSet) {
-            String projectJsonTemp = redisTemplate.opsForValue().get(projectKey);
+            String projectJsonTemp = stringRedisTemplate.opsForValue().get(projectKey);
             ProjectMessageDTO projectMessageTemp = JsonUtil.jsonToBean(projectJsonTemp, ProjectMessageDTO.class);
             parallelismSum += projectMessageTemp.getParallelism();
         }
         // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
         if (parallelismSum + parallelism <= simulationLicenseNumber) {
-            run(clusterId, projectId, redisPrefix.getProjectRunningKey(), projectJson);
+            run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
         } else {
             wait(clusterId, projectId, redisPrefix.getProjectWaitingKey(), projectJson);
         }
     }
 
-    public void run(String clusterId, String projectId, String projectRunningKey, String projectJson) {
-        log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + " 执行!");
-        redisTemplate.opsForValue().set(projectRunningKey, projectJson);
-        parseProject(projectJson, "cluster:" + clusterId, projectRunningKey);
+    public void run(String clusterId, String projectId, String projectType, String projectRunningKey, String projectJson, long parallelism) {
+
+        //1 获取一个剩余可用并行度最大的节点
+        KubernetesNodeTO maxParallelismPNodeTO = projectUtil.getMaxParallelismPNode();
+        String maxRestParallelismNode = maxParallelismPNodeTO.getName();
+        Long maxRestParallelism = maxParallelismPNodeTO.getMaxParallelism();
+
+        //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
+        if (maxRestParallelism > parallelism) {
+            log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
+            parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, parallelism);
+        } else if (maxRestParallelism > 0) {
+            log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
+            parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, maxRestParallelism);
+        }
     }
 
     public void wait(String clusterId, String projectId, String projectWaitingKey, String projectJson) {
-        log.info("ProjectConsumer--cacheManualProject 集群 " + clusterId + " 将项目 " + projectId + " 放入等待队列!");
-        redisTemplate.opsForValue().set(projectWaitingKey, projectJson);
+        log.info("ProjectConsumer--wait 集群 " + clusterId + " 将项目 " + projectId + " 放入等待队列!");
+        stringRedisTemplate.opsForValue().set(projectWaitingKey, projectJson);
     }
 
 
     /**
-     * 开始执行以及重新执行
-     *
-     * @param projectJson 项目启动消息
+     * @param projectId
+     * @param projectJson
+     * @param clusterPrefix
+     * @param projectRunningPrefix projectRunningKey
+     * @param nodeName
+     * @param parallelism
      */
     @SneakyThrows
-    public void parseProject(String projectJson, String clusterPrefix, String projectRunningPrefix) {
-
+    public void parseProject(String projectId, String projectType, String projectJson, String clusterPrefix, String projectRunningPrefix, String nodeName, long parallelism) {
         // -------------------------------- 0 准备 --------------------------------
+        projectService.prepare(clusterPrefix, projectId, projectType, projectRunningPrefix, projectJson, nodeName, parallelism);
         log.info("ProjectConsumer--parseManualProject 接收到项目开始消息为:" + projectJson);
-        //1 读取 kafka 的 project 信息
         ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
-        String projectId = projectMessageDTO.getProjectId();    // 项目 id
-        String projectType = projectMessageDTO.getType();    // 项目 id
         String packageId = projectMessageDTO.getScenePackageId();   // 场景测试包 id
         Long maxSimulationTime = projectMessageDTO.getMaxSimulationTime(); // 最大仿真时间,即生成视频的时间长度
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
         String userId = manualProjectMapper.selectCreateUserById(projectId);
-        Long parallelism = projectMessageDTO.getParallelism();    // 并行度
-        //2 执行前准备,删除改项目下所有任务,即重新执行改项目时需要新的测试包
-        projectService.prepare(clusterPrefix, projectId, projectJson);
         // -------------------------------- 1 查询场景 --------------------------------
         //1-1 根据场景测试包 packageId,拿到场景集合(不包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
         List<ScenePO> scenePOList = projectService.handlePackage(projectRunningPrefix, projectId, packageId);
@@ -214,6 +208,7 @@ public class ProjectConsumer {
         String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
         projectService.transferAndRunYaml(
+                nodeName,
                 jobTemplate + "job-template.yaml",
                 projectId,
                 algorithmDockerImage,

+ 3 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/TaskController.java

@@ -9,12 +9,14 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.annotation.Resource;
+
 @RefreshScope
 @RestController
 @RequestMapping("/task")
 public class TaskController {
 
-    @Autowired
+    @Resource
     TaskService taskService;
 
     // -------------------------------- Comment --------------------------------

+ 2 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskIndexManager.java

@@ -5,15 +5,15 @@ import com.css.simulation.resource.scheduler.pojo.po.LeafIndexPO;
 import org.apache.ibatis.session.ExecutorType;
 import org.apache.ibatis.session.SqlSession;
 import org.apache.ibatis.session.SqlSessionFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.util.List;
 
 @Component
 public class TaskIndexManager {
 
-    @Autowired
+    @Resource
     private SqlSessionFactory sqlSessionFactory;
 
     public void batchInsertLeafIndex(List<LeafIndexPO> leafTaskIndexList) {

+ 12 - 21
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -2,7 +2,6 @@ package com.css.simulation.resource.scheduler.manager;
 
 import api.common.pojo.constants.DictConstants;
 import api.common.util.*;
-import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
 import com.css.simulation.resource.scheduler.mapper.IndexMapper;
 import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
@@ -25,13 +24,13 @@ import org.apache.http.client.config.RequestConfig;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
+import javax.annotation.Resource;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
@@ -56,29 +55,27 @@ public class TaskManager {
     String evaluationLevelUri;
     @Value("${scheduler.minio-path.project-result}")
     String resultPathMinio;
-    @Autowired
-    ClusterMapper clusterMapper;
-    @Autowired
+    @Resource
     StringRedisTemplate stringRedisTemplate;
-    @Autowired
+    @Resource
     TaskMapper taskMapper;
-    @Autowired
+    @Resource
     MinioClient minioClient;
-    @Autowired
+    @Resource
     ManualProjectMapper manualProjectMapper;
-    @Autowired
+    @Resource
     KafkaTemplate<String, String> kafkaTemplate;
-    @Autowired
+    @Resource
     TaskIndexManager taskIndexManager;
-    @Autowired
+    @Resource
     IndexMapper indexMapper;
-    @Autowired
+    @Resource
     CloseableHttpClient closeableHttpClient;
-    @Autowired
+    @Resource
     RequestConfig requestConfig;
-    @Autowired
+    @Resource
     ProjectUtil projectUtil;
-    @Autowired
+    @Resource
     ApiClient apiClient;
 
     @SneakyThrows
@@ -429,12 +426,6 @@ public class TaskManager {
             log.error("TaskService--taskState 前缀为 " + redisPrefix.getProjectRunningKey() + " 的 key 为空!");
         }
 
-        // 删除所有 key
-//        Set<String> keys = redisTemplate.keys("manualProject:" + projectId + "*");
-//        assert keys != null;
-//        redisTemplate.delete(keys);
-//        log.info("------- /state 任务 " + taskId + " 的父项目为:" + projectId);
-
 
         // 删除 kafka topic
 //        SshClient clientKafka = SshUtil.getClient();

+ 19 - 9
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/AutoSubProjectMapper.java

@@ -1,21 +1,29 @@
 package com.css.simulation.resource.scheduler.mapper;
 
 
-import org.apache.ibatis.annotations.Mapper;
-import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
-import org.apache.ibatis.annotations.Update;
+import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
+import org.apache.ibatis.annotations.*;
+import org.apache.ibatis.type.JdbcType;
 
 import java.sql.Timestamp;
+import java.util.List;
 
 @Mapper
 public interface AutoSubProjectMapper {
 
-    @Update("update simulation_automatic_subproject\n" +
-            "set now_run_state  = #{state}\n" +
-            "where id = #{id}")
-    void updateProjectStateById(@Param("subProjectId") String subProjectId, @Param("nowRunState") String nowRunState);
-
+    @Results(id = "project", value = {
+            @Result(column = "id", property = "id", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "scene", property = "scenePackageId", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "create_user_id", property = "createUserId", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "parallelism", property = "parallelism", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "project_type", property = "projectType", jdbcType = JdbcType.VARCHAR)
+    })
+    @Select("select sas.id, sap.scene, sas.create_user_id, sap.parallelism, '2' project_type\n" +
+            "from simulation_automatic_subproject sas\n" +
+            "         left join simulation_automatic_project sap on sas.parent_id = sap.id\n" +
+            "where sas.is_deleted = '0'\n" +
+            "  and sas.now_run_state = #{nowRunState}")
+    List<ProjectPO> selectByNowRunState(@Param("nowRunState") String nowRunState);
 
     @Select("select create_user_id\n" +
             "from simulation_automatic_subproject\n" +
@@ -33,4 +41,6 @@ public interface AutoSubProjectMapper {
             "    finish_time = #{finishTime}\n" +
             "where id = #{id}")
     void updateNowRunStateAndFinishTimeById(@Param("nowRunState") String state, @Param("finishTime") Timestamp finishTime, @Param("id") String id);
+
+
 }

+ 4 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/ManualProjectMapper.java

@@ -15,7 +15,9 @@ public interface ManualProjectMapper {
     @Results(id = "project", value = {
             @Result(column = "id", property = "id", jdbcType = JdbcType.VARCHAR),
             @Result(column = "scene", property = "scenePackageId", jdbcType = JdbcType.VARCHAR),
-            @Result(column = "create_user_id", property = "createUserId", jdbcType = JdbcType.VARCHAR)
+            @Result(column = "create_user_id", property = "createUserId", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "parallelism", property = "parallelism", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "project_type", property = "projectType", jdbcType = JdbcType.VARCHAR)
     })
     @Select("select id, scene, create_user_id\n" +
             "from simulation_manual_project\n" +
@@ -23,7 +25,7 @@ public interface ManualProjectMapper {
     ProjectPO selectById(@Param("projectId") String projectId);
 
     @ResultMap("project")
-    @Select("select id, scene, create_user_id\n" +
+    @Select("select id, scene, create_user_id, parallelism, '1' project_type\n" +
             "from simulation_manual_project\n" +
             "where is_deleted = '0'\n" +
             "  and now_run_state = #{nowRunState}")

+ 2 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/VehicleMapper.java

@@ -8,7 +8,7 @@ import org.apache.ibatis.type.JdbcType;
 public interface VehicleMapper {
 
     @Results(id = "vehicle", value = {
-            @Result(column = "vehicle_model", property = "modelLabel", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "model_label", property = "modelLabel", jdbcType = JdbcType.VARCHAR),
             @Result(column = "max_speed", property = "maxSpeed", jdbcType = JdbcType.DECIMAL),
             @Result(column = "engine_power", property = "enginePower", jdbcType = JdbcType.DECIMAL),
             @Result(column = "max_deceleration", property = "maxDeceleration", jdbcType = JdbcType.DECIMAL),
@@ -27,7 +27,7 @@ public interface VehicleMapper {
             @Result(column = "height_distance", property = "heightDistance", jdbcType = JdbcType.DECIMAL),
             @Result(column = "wheelbase", property = "wheelbase", jdbcType = JdbcType.DECIMAL),
     })
-    @Select("select vehicle_model,\n" +
+    @Select("select model_label,\n" +
             "       max_speed,\n" +
             "       engine_power,\n" +
             "       max_deceleration,\n" +

+ 3 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/po/ProjectPO.java

@@ -14,5 +14,8 @@ public class ProjectPO {
     private String id;
     private String scenePackageId;
     private String createUserId;
+    private String parallelism;
+    private String projectType;
+
 
 }

+ 15 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/to/KubernetesNodeTO.java

@@ -0,0 +1,15 @@
+package com.css.simulation.resource.scheduler.pojo.to;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class KubernetesNodeTO {
+    private String name;
+    private Long maxParallelism;
+}

+ 66 - 116
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -7,13 +7,11 @@ import api.common.util.JsonUtil;
 import api.common.util.StringUtil;
 import api.common.util.TimeUtil;
 import com.css.simulation.resource.scheduler.consumer.ProjectConsumer;
-import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
-import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
-import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-import com.css.simulation.resource.scheduler.mapper.UserMapper;
+import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.ClusterPO;
 import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
+import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.service.TaskService;
 import com.css.simulation.resource.scheduler.util.KubernetesUtil;
@@ -21,13 +19,13 @@ import com.css.simulation.resource.scheduler.util.ProjectUtil;
 import io.kubernetes.client.openapi.ApiClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
-import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
@@ -44,25 +42,23 @@ public class ProjectScheduler {
     @Value("${scheduler.linux-path.job-yaml}")
     String jobYaml;
     // -------------------------------- Comment --------------------------------
-    @Autowired
-    StringRedisTemplate redisTemplate;
-    @Autowired
+    @Resource
+    StringRedisTemplate stringRedisTemplate;
+    @Resource
     TaskService taskService;
-    @Autowired
+    @Resource
     TaskMapper taskMapper;
-    @Autowired
+    @Resource
     ClusterMapper clusterMapper;
-    @Autowired
+    @Resource
     ManualProjectMapper manualProjectMapper;
-    @Autowired
+    @Resource
+    AutoSubProjectMapper autoSubProjectMapper;
+    @Resource
     ApiClient apiClient;
-    @Autowired
-    KafkaTemplate<String, String> kafkaTemplate;
-    @Autowired
+    @Resource
     ProjectConsumer projectConsumer;
-    @Autowired
-    UserMapper userMapper;
-    @Autowired
+    @Resource
     ProjectUtil projectUtil;
 
 
@@ -73,10 +69,18 @@ public class ProjectScheduler {
     @SneakyThrows
     public void dispatchProject() {
 
-        //1 查询已经排队的项目,即已经在页面上点击运行
-        List<ProjectPO> projectList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
-        for (ProjectPO project : projectList) {
+        //1 查询已经在页面上点击运行的项目(后面会判断是排队中还是已经与运行了)
+        List<ProjectPO> manualProjectList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
+        List<ProjectPO> autoSubProjectList = autoSubProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
+        List<ProjectPO> allProject = new ArrayList<>();
+        allProject.addAll(manualProjectList);
+        allProject.addAll(autoSubProjectList);
+
+
+        for (ProjectPO project : allProject) {
             String projectId = project.getId();
+            String projectType = project.getProjectType();
+            long parallelism = Long.parseLong(project.getParallelism());
             String userId = project.getCreateUserId();
             ClusterPO clusterPO = clusterMapper.selectByUserId(userId);
             if (clusterPO == null) {
@@ -85,55 +89,58 @@ public class ProjectScheduler {
             }
             String clusterId = clusterPO.getId();
             PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
-            if (StringUtil.isNotEmpty(redisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey()))) {
-                continue; // 判断项目是否已经在执行,如果执行则 continue
+            // --------------------------------  判断项目是否已经在执行,如果执行则 continue --------------------------------
+            if (StringUtil.isNotEmpty(stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey()))) {
+                continue;
             }
+            // -------------------------------- 项目没有执行说明等待中 --------------------------------
             int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
             // 获取该用户正在运行的项目数量
-            Set<String> clusterRunningKeySet = redisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
-            List<String> runningProjectSet;
+            Set<String> clusterRunningKeySet = stringRedisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
+            List<String> runningProjectSet = null;
             // cluster:${clusterId}:running:${projectId}
             if (CollectionUtil.isNotEmpty(clusterRunningKeySet)) {
                 runningProjectSet = projectUtil.getRunningProjectList(clusterRunningKeySet);
                 if (CollectionUtil.isNotEmpty(runningProjectSet)) {
-                    log.info("ProjectScheduler--dispatchProject 运行中的项目的 key 有:"+runningProjectSet);
+                    log.info("ProjectScheduler--dispatchProject 运行中的项目的 key 有:" + runningProjectSet);
                     long parallelismSum = 0;
                     for (String runningProjectKey : runningProjectSet) {
-                        parallelismSum += JsonUtil.jsonToBean(redisTemplate.opsForValue().get(runningProjectKey), ProjectMessageDTO.class).getParallelism();
+                        parallelismSum += JsonUtil.jsonToBean(stringRedisTemplate.opsForValue().get(runningProjectKey), ProjectMessageDTO.class).getParallelism();
                     }
                     if (parallelismSum < simulationLicenseNumber) {
-                        Set<String> waitingProjectSet = redisTemplate.keys(redisPrefix.getClusterWaitingPrefix() + "*");
-                        if (CollectionUtil.isEmpty(waitingProjectSet)) {
+                        if (parallelismSum + parallelism < simulationLicenseNumber) {
+                            run(clusterId, projectId, projectType, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
                             return;
                         }
-                        for (String waitingProjectKey : waitingProjectSet) {
-                            Long parallelism = JsonUtil.jsonToBean(redisTemplate.opsForValue().get(waitingProjectKey), ProjectMessageDTO.class).getParallelism();
-                            if (parallelismSum + parallelism < simulationLicenseNumber) {
-                                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
-                                return;
-                            }
-                        }
                     }
-                } else {
-                    run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
                 }
-            } else {
-                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
+            }
+            if ((CollectionUtil.isEmpty(clusterRunningKeySet) || CollectionUtil.isEmpty(runningProjectSet)) && parallelism < simulationLicenseNumber) {
+                run(clusterId, projectId, projectType, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
             }
         }
     }
 
-    public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey) {
-        String clusterPrefix = "cluster:" + clusterId;
-        String projectJson = redisTemplate.opsForValue().get(projectWaitingKey);
-        redisTemplate.delete(projectWaitingKey);
+
+    public void run(String clusterId, String projectId, String projectType, String projectWaitingKey, String projectRunningKey, long parallelism) {
+        String projectJson = stringRedisTemplate.opsForValue().get(projectWaitingKey);
         if (StringUtil.isEmpty(projectJson)) {
             log.error("ProjectScheduler--run 项目 " + projectId + " 的开始消息查询失败,key 为:" + projectWaitingKey);
             return;
         }
-        redisTemplate.opsForValue().set(projectRunningKey, projectJson);
-        log.info("ProjectScheduler--run 项目 " + projectId + " 从等待队列进入执行状态!");
-        projectConsumer.parseProject(projectJson, clusterPrefix, projectRunningKey);
+        //1 获取一个剩余可用并行度最大的节点
+        KubernetesNodeTO maxParallelismPNodeTO = projectUtil.getMaxParallelismPNode();
+        String maxRestParallelismNode = maxParallelismPNodeTO.getName();
+        Long maxRestParallelism = maxParallelismPNodeTO.getMaxParallelism();
+
+        //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
+        if (maxRestParallelism > parallelism) {
+            log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
+            projectConsumer.parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, parallelism);
+        } else if (maxRestParallelism > 0) {
+            log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
+            projectConsumer.parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, maxRestParallelism);
+        }
     }
 
 
@@ -153,7 +160,7 @@ public class ProjectScheduler {
                 String taskId = task.getId();
                 PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaksId(userId, projectId, taskId);
                 // 获取心跳时间
-                String tickTime = redisTemplate.opsForValue().get(redisPrefix.getTaskTickKey());
+                String tickTime = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskTickKey());
                 if (StringUtil.isEmpty(tickTime)) {
                     log.error(redisPrefix.getTaskTickKey() + ",该 key 的心跳时间为空!");
                     continue;
@@ -161,7 +168,7 @@ public class ProjectScheduler {
                 long lastTickTime = Long.parseLong(tickTime);
                 // 如果心跳超时则更改任务状态为 Aborted
                 if (TimeUtil.getNow() - lastTickTime > timeout) {
-                    String podName = redisTemplate.opsForValue().get(redisPrefix.getTaskPodKey());
+                    String podName = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskPodKey());
                     taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
                 }
             }
@@ -182,21 +189,21 @@ public class ProjectScheduler {
             String projectId = project.getId();
             String userId = project.getCreateUserId();
             PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectId(userId, projectId);
-            String lastNowString = redisTemplate.opsForValue().get(redisPrefix.getProjectCheckKey());
-            // 获取正在运行的 pod 列表
-            List<String> podList = KubernetesUtil.getPod(apiClient, "");
-            int taskNumber = podList.size();
+            String lastNowString = stringRedisTemplate.opsForValue().get(redisPrefix.getProjectCheckKey());
+            // 获取该项目的 job 中正在运行的 pod 数量
+            List<String> allPodList = KubernetesUtil.getPod(apiClient, "");
+            long taskNumber = allPodList.stream().filter(podName -> podName.contains(projectId)).count();
             // 如果没有检查过且 pod 列表为空,则正式开始检查,设置第一次检查时间
-            if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
+            if (StringUtil.isEmpty(lastNowString) && taskNumber == 0L) {
                 log.info("ProjectScheduler--projectCheck 开始检查项目 " + projectId);
-                redisTemplate.opsForValue().set(redisPrefix.getProjectCheckKey(), TimeUtil.getNowString());
+                stringRedisTemplate.opsForValue().set(redisPrefix.getProjectCheckKey(), TimeUtil.getNowString());
                 return;
             }
-            log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
+            log.info("ProjectScheduler--projectCheck kubernetes 的命名空间 default 中正在运行的 pod 有:" + allPodList + ",其中项目 " + projectId + " 的任务个数为:" + taskNumber);
             //  如果两次检查时间超过了 2 分钟,且仍然没有 pod 执行,则准备重启
-            if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0 && Long.parseLong(TimeUtil.getNowString()) - Long.parseLong(lastNowString) > (long) 120 * 1000) {
+            if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0L && Long.parseLong(TimeUtil.getNowString()) - Long.parseLong(lastNowString) > (long) 120 * 1000) {
                 // 删除检查
-                redisTemplate.delete(redisPrefix.getProjectCheckKey());
+                stringRedisTemplate.delete(redisPrefix.getProjectCheckKey());
                 try {
                     // 删除 job
                     KubernetesUtil.deleteJob(apiClient, "default", "project-" + projectId);
@@ -208,61 +215,4 @@ public class ProjectScheduler {
             }
         }
     }
-
-
-//    /**
-//     * 解决 pod 莫名全部关闭但是 job 还在的问题
-//     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
-//     */
-//    @Scheduled(fixedDelay = 30 * 1000)
-//    @SneakyThrows
-//    public void projectCheck() {
-//        SshClient client = SshUtil.getClient();
-//        ClientSession session = SshUtil.getSession(client, hostname, username, password);
-//
-//        //1 查询出正在运行中的 project
-//        List<ProjectPO> projectIdList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
-//        log.info("ProjectScheduler--projectCheck 运行中的项目有:" + projectIdList);
-//        //2 根据 projectId 获取 pod
-//        for (ProjectPO project : projectIdList) {
-//            String projectId = project.getId();
-//            String userId = project.getCreateUserId();
-//            PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectId(userId, projectId);
-//            String checkKey = redisPrefix.getProjectCheckKey();
-//            String lastNowString = redisTemplate.opsForValue().get(checkKey);
-//            String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-//            log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
-//            int taskNumber = StringUtil.countSubString(podList, "project");
-//            if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {  // 为空代表第一次,先设置时间
-//                redisTemplate.opsForValue().set(checkKey, TimeUtil.getNowString());
-//            }
-//            if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {  // 非空则开始检查
-//                // 判断两次是否超过2分钟
-//                //3 如果 pod 为空,则重启 job
-//                long lastNow = Long.parseLong(lastNowString);
-//                long now = Long.parseLong(TimeUtil.getNowString());
-//                if (now - lastNow > (long) 120 * 1000) {
-//                    redisTemplate.opsForValue().set(checkKey, TimeUtil.getNowString());
-//                    SshUtil.execute(session, "kubectl delete job project-" + projectId);
-//                    Thread.sleep(15000);
-//                    while (true) {
-//                        log.info("ProjectScheduler--projectCheck 准备重启项目 " + projectId);
-//                        String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-//                        log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
-//                        int taskNumber2 = StringUtil.countSubString(podList2, "project");
-//                        if (taskNumber2 == 0) {
-//                            break;
-//                        }
-//                    }
-//                    Thread.sleep(15000);
-//                    log.info("ProjectScheduler--projectCheck 重新执行项目" + projectId);
-//                    String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
-//                    SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
-//                }
-//            }
-//        }
-//        session.close();
-//        client.stop();
-//
-//    }
 }

+ 3 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/AlgorithmService.java

@@ -10,14 +10,15 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
 import java.io.InputStream;
 
 @Service
 public class AlgorithmService {
 
-    @Autowired
+    @Resource
     AlgorithmMapper algorithmMapper;
-    @Autowired
+    @Resource
     MinioClient minioClient;
     @Value("${minio.bucket-name}")
     String bucketName;

+ 32 - 37
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -18,18 +18,15 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.impl.client.CloseableHttpClient;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
+import javax.annotation.Resource;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 
 @Service
@@ -48,42 +45,44 @@ public class ProjectService {
     String username;
     @Value("${scheduler.host.password}")
     String password;
-
-    // -------------------------------- Comment --------------------------------
-    @Autowired
+    @Resource
     StringRedisTemplate stringRedisTemplate;
-    @Autowired
+    @Resource
     KafkaTemplate<String, String> kafkaTemplate;
-    @Autowired
+    @Resource
     CloseableHttpClient closeableHttpClient;
-    @Autowired
+    @Resource
     RequestConfig requestConfig;
-    @Autowired
+    @Resource
     MinioClient minioClient;
-    @Autowired
+    @Resource
     ManualProjectMapper manualProjectMapper;
-    @Autowired
+    @Resource
     AutoSubProjectMapper autoSubProjectMapper;
-    @Autowired
+    @Resource
     TaskMapper taskMapper;
-    @Autowired
+    @Resource
     IndexTemplateMapper indexTemplateMapper;
-    @Autowired
+    @Resource
     IndexMapper indexMapper;
-    @Autowired
+    @Resource
     SceneMapper sceneMapper;
-    @Autowired
+    @Resource
     AlgorithmMapper algorithmMapper;
-    @Autowired
+    @Resource
     ApiClient apiClient;
-    @Autowired
+    @Resource
     ProjectUtil projectUtil;
 
     // -------------------------------- Comment --------------------------------
 
     @Transactional
-    public void prepare(String clusterPrefix, String projectId, String projectType) {
-        //1 如果集群中有该项目旧的信息则直接删除
+    public void prepare(String clusterPrefix, String projectId, String projectType,String projectRunningKey,String projectJson, String nodeName, long parallelism) {
+        //1 将指定 node 的并行度减少
+        String restParallelismKey = "node:" + nodeName;
+        long restParallelism = Long.parseLong(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(restParallelismKey)));// 剩余可用并行度
+        stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism - parallelism) + "");
+        //2 将 redis 中该项目旧的信息则直接删除(包括 waitingKey)
         Set<String> oldKeys = stringRedisTemplate.keys(clusterPrefix + "*");
         if (CollectionUtil.isNotEmpty(oldKeys)) {
             for (String oldKey : oldKeys) {
@@ -92,14 +91,16 @@ public class ProjectService {
                 }
             }
         }
-        //2 将项目状态修改为执行中
+        //3 将项目信息放入 redis
+        stringRedisTemplate.opsForValue().set(projectRunningKey, projectJson);
+        //4 将项目状态修改为执行中
         if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
             manualProjectMapper.updateNowRunStateById(DictConstants.PROJECT_RUNNING, projectId);   // 修改该 project 的状态为执行中,同时将已完成任务重置为 0 。
         } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
             autoSubProjectMapper.updateNowRunStateById(DictConstants.PROJECT_RUNNING, projectId);
         }
 
-        //3 将该 project 下所有旧的任务和指标得分删除。
+        //5 将该 project 下所有旧的任务和指标得分删除。
         taskMapper.deleteByProject(projectId);
         indexMapper.deleteFirstTargetScoreByProjectId(projectId);
         indexMapper.deleteLastTargetScoreByProjectId(projectId);
@@ -272,7 +273,6 @@ public class ProjectService {
             ObjectMapper objectMapper = new ObjectMapper();
             JsonNode jsonNode = objectMapper.readTree(result);
             JsonNode dataNode = jsonNode.path("data");
-
             String token = dataNode.path("access_token").asText();
 
             /*
@@ -351,11 +351,11 @@ public class ProjectService {
      * @param jobTemplateYamlPathTarget 执行文件
      */
     @SneakyThrows
-    public void transferAndRunYaml(String jobTemplateYamlPathSource, String projectId, String algorithmDockerImage, long completions, long parallelism, String jobTemplateYamlPathTarget) {
+    public void transferAndRunYaml(String nodeName, String jobTemplateYamlPathSource, String projectId, String algorithmDockerImage, long completions, long parallelism, String jobTemplateYamlPathTarget) {
         log.info("ProjectConsumer--transferYaml 项目 " + projectId + " 的完成度为:" + completions);
         log.info("ProjectConsumer--transferYaml 项目 " + projectId + " 的并行度为:" + parallelism);
         String yamlSource = FileUtil.read(jobTemplateYamlPathSource);
-        log.info("ProjectConsumer--transferYaml 模板文件为:" + yamlSource);
+//        log.info("ProjectConsumer--transferYaml 模板文件为:" + yamlSource);
         String replace0 = yamlSource.replace("job-cloud-simulation", "project-" + projectId);
         String replace1 = replace0.replace("vtd-container", "vtd-" + projectId);
         String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
@@ -365,15 +365,8 @@ public class ProjectService {
         String replace6 = replace5.replace("parallelism-number", parallelism + "");
         String replace7 = replace6.replace("apiVers1on", "apiVersion");
         String replace8 = replace7.replace("1atch/v1", "batch/v1");
-        // 根据 kubernetes 的 node 分配 job
-        //1 获取 node 列表
-        //2 获取 node 的 pod 列表
-        //3 获取 pod 数量最少的 node
-        //4 将 job 指定 node 执行
-
-
-        String finalYaml = replace8;
-        log.info("ProjectConsumer--parseManualProject 开始执行 yaml 文件" + finalYaml);
+        String finalYaml = replace8.replace("node-name", nodeName);
+        log.info("ProjectConsumer--parseManualProject 在节点 " + nodeName + " 开始执行 job:" + finalYaml);
         FileUtil.writeStringToLocalFile(finalYaml, jobTemplateYamlPathTarget);
         //  启动
         KubernetesUtil.applyYaml(hostname, username, password, jobTemplateYamlPathTarget);
@@ -395,4 +388,6 @@ public class ProjectService {
             }
         }
     }
+
+
 }

+ 10 - 34
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -2,57 +2,42 @@ package com.css.simulation.resource.scheduler.service;
 
 import api.common.util.SshUtil;
 import com.css.simulation.resource.scheduler.manager.TaskManager;
-import com.css.simulation.resource.scheduler.mapper.IndexTemplateMapper;
-import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
-import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
+
 @Service
 @Slf4j
 public class TaskService {
 
-
-    @Value("${minio.bucket-name}")
-    String bucketName;
     @Value("${scheduler.host.hostname}")
     String hostname;
     @Value("${scheduler.host.username}")
     String username;
     @Value("${scheduler.host.password}")
     String password;
-    @Value("${spring.kafka.delete-command}")
-    String kafkaDeleteCommand;
-    // -------------------------------- Comment --------------------------------
-
-    @Autowired
-    StringRedisTemplate stringRedisTemplate;
-    @Autowired
+    @Resource
     TaskManager taskManager;
-    @Autowired
-    IndexTemplateMapper indexTemplateMapper;
-    @Autowired
-    MinioClient minioClient;
-    @Autowired
+    @Resource
     TaskMapper taskMapper;
-    @Autowired
-    ManualProjectMapper manualProjectMapper;
-    @Autowired
+    @Resource
     ProjectUtil projectUtil;
 
+    // -------------------------------- Comment --------------------------------
 
     @SneakyThrows
     public void taskState(String taskId, String state, String podName) {
+        SshClient sshClient = SshUtil.getClient();
+        ClientSession clientSession = SshUtil.getSession(sshClient, hostname, username, password);
         log.info("TaskService--state 接收到参数为:taskId=" + taskId + ",state=" + state + ",podName=" + podName);
         TaskPO taskPO = taskMapper.selectById(taskId);
         if (taskPO == null) {
@@ -62,8 +47,6 @@ public class TaskService {
         String projectId = taskPO.getPId();
         String userId = taskPO.getCreateUserId();
         PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaksId(userId, projectId, taskId);
-        SshClient sshClient = SshUtil.getClient();
-        ClientSession clientSession = SshUtil.getSession(sshClient, hostname, username, password);
         //1 判断项目是否已完成
         boolean projectCompleted = taskManager.isProjectCompleted(redisPrefix, projectId, taskId, state, podName);
         if (!projectCompleted) {
@@ -71,21 +54,14 @@ public class TaskService {
             sshClient.stop();
             return;
         }
-
         //2 准备打分
         log.info("TaskService--taskState 项目 " + projectId + "准备打分!");
         taskManager.prepareScore(redisPrefix.getProjectRunningKey());
-
         //3 打分
         taskManager.score(redisPrefix, userId, projectId, clientSession);
-
-        // -------------------------------- 收尾 --------------------------------
-
-        taskManager.done(redisPrefix,sshClient, clientSession, projectId);
+        //4 结束
+        taskManager.done(redisPrefix, sshClient, clientSession, projectId);
         log.info("TaskService--taskState 项目 " + projectId + " 执行完成!");
-
-
-
     }
 
 

+ 37 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -2,17 +2,21 @@ package com.css.simulation.resource.scheduler.util;
 
 import api.common.pojo.constants.DictConstants;
 import api.common.util.StringUtil;
+import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
 import com.css.simulation.resource.scheduler.mapper.AutoSubProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
 import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.UserMapper;
 import com.css.simulation.resource.scheduler.pojo.po.UserPO;
+import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -34,6 +38,38 @@ public class ProjectUtil {
     ClusterMapper clusterMapper;
     @Autowired
     KafkaTemplate<String, String> kafkaTemplate;
+    @Resource
+    KubernetesConfiguration kubernetesConfiguration;
+    @Resource
+    StringRedisTemplate stringRedisTemplate;
+
+
+    public KubernetesNodeTO getMaxParallelismPNode() {
+        List<KubernetesNodeTO> nodeList = kubernetesConfiguration.getNodeList();
+        String maxRestParallelismNode = "master";
+        long maxRestParallelism = 0L;
+        for (KubernetesNodeTO kubernetesNodeTO : nodeList) {
+            String name = kubernetesNodeTO.getName();
+            Long maxParallelism = kubernetesNodeTO.getMaxParallelism();
+            String restParallelismKey = "node:" + name;
+            String restParallelismString = stringRedisTemplate.opsForValue().get(restParallelismKey);
+            long restParallelism;
+            if (restParallelismString == null) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大剩余可用并行度
+                restParallelism = maxParallelism;
+                stringRedisTemplate.opsForValue().set(restParallelismKey, restParallelism + "");
+            } else {
+                restParallelism = Long.parseLong(restParallelismString);
+            }
+            if (restParallelism > maxParallelism) {
+                maxRestParallelism = restParallelism;
+                maxRestParallelismNode = name;
+            }
+        }
+        return KubernetesNodeTO.builder()
+                .name(maxRestParallelismNode)
+                .maxParallelism(maxRestParallelism)
+                .build();
+    }
 
 
     public PrefixTO getRedisPrefixByUserIdAndProjectIdAndTaksId(String userId, String projectId, String taskId) {
@@ -189,4 +225,5 @@ public class ProjectUtil {
                 .projectCheckKey(projectCheckKey)
                 .build();
     }
+
 }

+ 1 - 0
simulation-resource-scheduler/src/main/resources/kubernetes/template/job/job-template.yaml

@@ -12,6 +12,7 @@ spec:
     metadata:
       name: pod-cloud-simulation
     spec:
+      nodeName: node-name
       hostAliases:
         - ip: 172.17.0.184
           hostnames: