martin пре 2 година
родитељ
комит
c20b362de7

+ 2 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -147,7 +147,7 @@ public class ProjectConsumer {
         String projectId = projectMessageDTO.getProjectId();
         int parallelism = projectMessageDTO.getParallelism();  // 期望并行度
         //1 获取所有节点的剩余可用并行度
-        Map<String, Integer> nodeMap = projectUtil.getNodeMap(parallelism);
+        Map<String, Integer> nodeMap = projectUtil.getNodeMapToUse(parallelism);
         if (nodeMap.size() == 0) {
             return;
         }
@@ -190,7 +190,7 @@ public class ProjectConsumer {
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
         // -------------------------------- 0 准备 --------------------------------
-        projectService.prepare(nodeMap, projectMessageDTO, clusterPrefix, projectRunningKey);
+        projectService.prepare(nodeMap, projectMessageDTO, clusterPrefix);
         String userId = null;
         if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
             userId = manualProjectMapper.selectCreateUserById(projectId);

+ 69 - 55
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -1,6 +1,7 @@
 package com.css.simulation.resource.scheduler.manager;
 
 import api.common.pojo.constants.DictConstants;
+import api.common.pojo.dto.ProjectMessageDTO;
 import api.common.util.*;
 import com.css.simulation.resource.scheduler.mapper.AutoSubProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.IndexMapper;
@@ -18,6 +19,8 @@ import com.css.simulation.resource.scheduler.util.ProjectUtil;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.models.V1Pod;
+import io.kubernetes.client.util.Yaml;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -40,6 +43,9 @@ import java.util.stream.Collectors;
 @Component
 @Slf4j
 public class TaskManager {
+
+    @Value("${scheduler.linux-path.pod-yaml-directory}")
+    String podYamlDirectory;
     @Value("${minio.bucket-name}")
     String bucketName;
     @Value("${scheduler.linux-path.score-py}")
@@ -81,7 +87,9 @@ public class TaskManager {
     @Resource
     ApiClient apiClient;
 
-    public static final String SIMULATION_NAMESPACE = "default";
+    @Value("${scheduler.kubernetes.namespace}")
+    String kubernetesNamespace;
+
 
     @SneakyThrows
     @Transactional
@@ -94,13 +102,11 @@ public class TaskManager {
             taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
             return false;
         } else { // 出错和结束的 pod 都直接删除
-            KubernetesUtil.deletePod(apiClient, SIMULATION_NAMESPACE, podName);
+            // -------------------------------- 处理状态 --------------------------------
+            //TODO 暂时不用重试操作
+            KubernetesUtil.deletePod(apiClient, kubernetesNamespace, podName);
             log.info("TaskManager--state 修改任务 " + taskId + "的状态为 " + state + ",pod 名称为:" + podName + ",并删除 pod。");
             if ("Aborted".equals(state)) {
-                if (retry(projectId, taskId, redisPrefix.getTaskRetryKey(), redisPrefix.getTaskMessageKey())) {
-                    taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
-                    return false;
-                }
                 String minioPathOfErrorLog = resultPathMinio + projectId + "/" + taskId + "error.log";
                 boolean objectExist = MinioUtil.isObjectExist(minioClient, bucketName, minioPathOfErrorLog);
                 String targetEvaluate;
@@ -123,44 +129,41 @@ public class TaskManager {
                 }
                 taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), targetEvaluate);
             } else if ("Terminated".equals(state)) {
-                if (retry(projectId, taskId, redisPrefix.getTaskRetryKey(), redisPrefix.getTaskMessageKey())) {
-                    taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
-                    return false;
-                }
                 taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_3);
             } else if ("PendingAnalysis".equals(state)) {
                 taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
             }
-        }
-        int taskNum = taskMapper.selectTaskNumByProjectId(projectId);
-        int endTaskNum = taskMapper.selectEndTaskNumByProjectId(projectId);    // 查询已结束的任务 'Aborted', 'PendingAnalysis', 'Terminated'
-        manualProjectMapper.updateTaskCompleted(projectId, endTaskNum);
-        log.info("TaskManager--isProjectCompleted 项目 " + projectId + " 完成进度为:" + endTaskNum + "/" + taskNum);
-        // 已结束任务数等于所有任务数量,才会准备打分;否则退出。
-        return taskNum == endTaskNum;
-    }
-
-    public boolean retry(String projectId, String taskId, String taskRetryKey, String taskMessageKey) {
-        try {
-            log.info("TaskManager--retry 重试操作收到的参数为:projectId=" + projectId + ",taskId=" + taskId);
-            //1 首先查看任务是否重试过 3 次
-            String retryString = stringRedisTemplate.opsForValue().get(taskRetryKey);
-            int retry = Integer.parseInt(Objects.requireNonNull(retryString));
-            //2 如果重试次数超过 3 次,则不再重试
-            if (retry >= 3) {
+            // -------------------------------- 判断项目是否结束 --------------------------------
+            ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey()), ProjectMessageDTO.class);
+            int taskTotal = projectMessageDTO.getTaskTotal();
+            int taskCompleted = projectMessageDTO.getTaskCompleted();
+            log.info("TaskManager--isProjectCompleted 项目 " + projectId + " 完成进度为:" + (taskCompleted + 1) + "/" + taskCompleted);
+            if (taskCompleted + 1 == taskTotal) {
+                return true;
+            } else {
+                projectMessageDTO.setTaskCompleted(taskCompleted + 1);
+                stringRedisTemplate.opsForValue().set(redisPrefix.getProjectRunningKey(), JsonUtil.beanToJson(projectMessageDTO));
                 return false;
             }
-            String taskJson = stringRedisTemplate.opsForValue().get(taskMessageKey);
-            log.info("TaskManager--retry 重试项目 " + projectId + " 的任务 " + taskId + ",重试次数为:" + retry + ",重新发送的消息为:" + taskJson);
-            retry++;
-            stringRedisTemplate.opsForValue().set(taskRetryKey, retry + "");
-            kafkaTemplate.send(projectId, taskJson);
-            return true;
-        } catch (Exception e) {
-            log.error("TaskManager--retry 重试操作报错:", e);
-            return false;
         }
+    }
 
+    /**
+     * 更改一个名字继续启动
+     *
+     * @param projectId
+     * @param podName
+     */
+    @SneakyThrows
+    public void createNextPod(String projectId, String podName) {
+        String lastPodString = FileUtil.read(podYamlDirectory + podName + ".yaml");
+        String nextPodName = "project-" + projectId + "-" + StringUtil.getRandomUUID();
+        String nextPodFileName = nextPodName + ".yaml";     // 模板文件名称
+        String nextPodString = lastPodString.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
+        FileUtil.writeStringToLocalFile(nextPodString, podYamlDirectory + nextPodFileName);
+        V1Pod v1Pod = (V1Pod) Yaml.load(nextPodString);
+        //  启动
+        KubernetesUtil.createPod(apiClient, kubernetesNamespace, v1Pod);
     }
 
     public void prepareScore(String projectRunningKey) {
@@ -416,16 +419,14 @@ public class TaskManager {
 
 
     @SneakyThrows
-    public void done(PrefixTO redisPrefix, SshClient sshClient, ClientSession clientSession, String projectId) {
+    public void done(PrefixTO redisPrefix, SshClient sshClient, ClientSession clientSession, String projectId, String podName) {
 
-        manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());   // 修改该 project 的状态为已完成
+        // 关闭 ssh 连接
+        clientSession.close();
+        sshClient.stop();
 
-        Set<String> keys = stringRedisTemplate.keys(redisPrefix.getProjectRunningKey() + "*");
-        if (CollectionUtil.isNotEmpty(keys)) {
-            keys.forEach(key -> stringRedisTemplate.delete(key));
-        } else {
-            log.error("TaskService--taskState 前缀为 " + redisPrefix.getProjectRunningKey() + " 的 key 为空!");
-        }
+        // 更新项目状态为已完成
+        manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());   // 修改该 project 的状态为已完成
 
 
         // 删除 kafka topic
@@ -436,19 +437,32 @@ public class TaskManager {
 //        SshUtil.stop(clientKafka, sessionKafka);
 
 
-        // 删除 job
-        SshUtil.execute(clientSession, "kubectl delete job project-" + projectId);
-        clientSession.close();
-        sshClient.stop();
+        Map<String, Integer> nodeMap = projectUtil.getNodeMap();
+        List<String> podList = KubernetesUtil.getPodByPrefix(apiClient, kubernetesNamespace, "project-" + projectId);
+        for (String tempPodName : podList) {
+            // 删除该 project 下的所有 pod
+            KubernetesUtil.deletePod(apiClient, kubernetesNamespace, tempPodName);
+            // 归还并行度
+            String tempNodeName = stringRedisTemplate.opsForValue().get("pod:" + tempPodName + ":node");
+            stringRedisTemplate.delete("pod:" + tempPodName + ":node");
+            int restParallelism = nodeMap.get(tempNodeName);
+            nodeMap.put(tempNodeName, restParallelism + 1);
+        }
+        nodeMap.forEach((tempNodeName, tempParallelism) -> {
+            String restParallelismKey = "node:" + tempNodeName + ":parallelism";
+            stringRedisTemplate.opsForValue().set(restParallelismKey, tempParallelism + "");
+        });
+
+        // 删除 redis 中的键值对
+        Set<String> keys = stringRedisTemplate.keys(redisPrefix.getProjectRunningKey() + "*");
+        if (CollectionUtil.isNotEmpty(keys)) {
+            keys.forEach(key -> stringRedisTemplate.delete(key));
+        } else {
+            log.error("TaskService--taskState 前缀为 " + redisPrefix.getProjectRunningKey() + " 的 key 为空!");
+        }
 
-        // 归还并行度
-        String nodeOfProject = "project:" + projectId + ":node";
-        String restParallelismKey = "node:" + stringRedisTemplate.opsForValue().get(nodeOfProject) + ":parallelism";
-        String usedParallelismKey = "project:" + projectId + ":parallelism";
-        long restParallelism = Long.parseLong(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(restParallelismKey)));
-        long usedParallelism = Long.parseLong(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(usedParallelismKey)));
-        stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism + usedParallelism) + "");
 
     }
 
+
 }

+ 17 - 62
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -12,10 +12,8 @@ import com.css.simulation.resource.scheduler.pojo.po.ClusterPO;
 import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.pojo.po.UserPO;
-import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.service.TaskService;
-import com.css.simulation.resource.scheduler.util.KubernetesUtil;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
 import io.kubernetes.client.openapi.ApiClient;
 import lombok.SneakyThrows;
@@ -28,6 +26,7 @@ import org.springframework.stereotype.Component;
 import javax.annotation.Resource;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 @Component
@@ -82,7 +81,6 @@ public class ProjectScheduler {
 
         for (ProjectPO project : allProject) {
             String projectId = project.getId();
-            String projectType = project.getProjectType();
             long parallelism = Long.parseLong(project.getParallelism());
             String userId = project.getCreateUserId();
             UserPO userPO = userMapper.selectById(userId);
@@ -124,7 +122,7 @@ public class ProjectScheduler {
             }
             // -------------------------------- 项目没有执行说明等待中 --------------------------------
             if (isSystem) { // 系统管理员直接执行
-                run(clusterId, projectId, projectType, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
+                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
                 return;
             }
             int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
@@ -142,37 +140,36 @@ public class ProjectScheduler {
                     }
                     if (parallelismSum < simulationLicenseNumber) {
                         if (parallelismSum + parallelism < simulationLicenseNumber) {
-                            run(clusterId, projectId, projectType, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
+                            run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
                             return;
                         }
                     }
                 }
             }
             if ((CollectionUtil.isEmpty(clusterRunningKeySet) || CollectionUtil.isEmpty(runningProjectSet)) && parallelism < simulationLicenseNumber) {
-                run(clusterId, projectId, projectType, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
+                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
             }
         }
     }
 
 
-    public void run(String clusterId, String projectId, String projectType, String projectWaitingKey, String projectRunningKey, long parallelism) {
-        String projectJson = stringRedisTemplate.opsForValue().get(projectWaitingKey);
-        if (StringUtil.isEmpty(projectJson)) {
-            log.error("ProjectScheduler--run 项目 " + projectId + " 的开始消息查询失败,key 为:" + projectWaitingKey);
+    @SneakyThrows
+    public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey, long parallelism) {
+        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(stringRedisTemplate.opsForValue().get(projectWaitingKey), ProjectMessageDTO.class);
+
+        //1 获取所有节点的剩余可用并行度
+        Map<String, Integer> nodeMap = projectUtil.getNodeMapToUse(parallelism);
+        if (nodeMap.size() == 0) {
             return;
         }
-        //1 获取一个剩余可用并行度最大的节点
-        KubernetesNodeTO maxParallelismPNodeTO = projectUtil.getMaxParallelismNode();
-        String maxRestParallelismNode = maxParallelismPNodeTO.getName();
-        Long maxRestParallelism = maxParallelismPNodeTO.getMaxParallelism();
+        //2 计算实际可用并行度
+        int parallelismSum = nodeMap.keySet().stream().mapToInt(nodeMap::get).sum();
 
         //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
-        if (maxRestParallelism > parallelism) {
-            log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
-            projectConsumer.parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, parallelism);
-        } else if (maxRestParallelism > 0) {
-            log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
-            projectConsumer.parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, maxRestParallelism);
+        if (parallelismSum > 0L) {
+            log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点 " + nodeMap + " 上以并行度 " + parallelismSum + " 执行!");
+            projectMessageDTO.setCurrentParallelism(parallelismSum);    // 设置实际的并行度
+            projectConsumer.parseProject(nodeMap, projectMessageDTO, "cluster:" + clusterId, projectRunningKey);
         }
     }
 
@@ -208,46 +205,4 @@ public class ProjectScheduler {
         }
     }
 
-    /**
-     * 解决 pod 莫名全部关闭但是 job 还在的问题
-     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
-     */
-    @Scheduled(fixedDelay = 30 * 1000)
-    @SneakyThrows
-    public void projectCheck() {
-        //1 查询出正在运行中的 project
-        List<ProjectPO> projectIdList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
-        //2 根据 projectId 获取 pod
-        for (ProjectPO project : projectIdList) {
-            String projectId = project.getId();
-            String userId = project.getCreateUserId();
-            PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectId(userId, projectId);
-            String lastNowString = stringRedisTemplate.opsForValue().get(redisPrefix.getProjectCheckKey());
-            // 获取该项目的 job 中正在运行的 pod 数量
-            List<String> allJobList = KubernetesUtil.getJob(apiClient, "default");
-            List<String> allPodList = KubernetesUtil.getPod(apiClient, "default");
-            long jobNumber = allJobList.stream().filter(jobName -> jobName.contains(projectId)).count();
-            long podNumber = allPodList.stream().filter(podName -> podName.contains(projectId)).count();
-            // 如果没有检查过且 pod 列表为空,则正式开始检查,设置第一次检查时间
-            if (StringUtil.isEmpty(lastNowString) && jobNumber == 1L && podNumber == 0L) {
-                log.info("ProjectScheduler--projectCheck 开始检查项目 " + projectId);
-                stringRedisTemplate.opsForValue().set(redisPrefix.getProjectCheckKey(), TimeUtil.getNowString());
-                return;
-            }
-            log.info("ProjectScheduler--projectCheck kubernetes 的命名空间 default 中正在运行的 pod 有:" + allPodList + ",其中项目 " + projectId + " 的任务个数为:" + podNumber);
-            //  如果两次检查时间超过了 2 分钟,且仍然没有 pod 执行,则准备重启
-            if (StringUtil.isNotEmpty(lastNowString) && jobNumber == 1L && podNumber == 0L && Long.parseLong(TimeUtil.getNowString()) - Long.parseLong(lastNowString) > (long) 120 * 1000) {
-                // 删除检查
-                stringRedisTemplate.delete(redisPrefix.getProjectCheckKey());
-                try {
-                    // 删除 job
-                    KubernetesUtil.deleteJob(apiClient, "default", "project-" + projectId);
-                } catch (Exception e) {
-                    log.info("ProjectScheduler--projectCheck 删除项目 " + projectId + " 的 job 失败,可能是已经删除!");
-                }
-                log.info("ProjectScheduler--projectCheck 重新执行项目" + projectId);
-                KubernetesUtil.applyYaml(hostname, username, password, jobYaml + "project-" + projectId + ".yaml");
-            }
-        }
-    }
 }

+ 23 - 28
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -13,10 +13,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.openapi.models.V1Container;
-import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1Pod;
-import io.kubernetes.client.openapi.models.V1PodSpec;
 import io.kubernetes.client.util.Yaml;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
@@ -31,6 +28,7 @@ import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Resource;
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.*;
 import java.util.stream.Collectors;
@@ -87,10 +85,9 @@ public class ProjectService {
      * @param nodeMap           节点列表以及剩余可用并行度
      * @param projectMessageDTO 初始接收到的项目启动信息
      * @param clusterPrefix     clusterPrefix
-     * @param projectRunningKey projectRunningKey
      */
     @Transactional
-    public void prepare(Map<String, Integer> nodeMap, ProjectMessageDTO projectMessageDTO, String clusterPrefix, String projectRunningKey) {
+    public void prepare(Map<String, Integer> nodeMap, ProjectMessageDTO projectMessageDTO, String clusterPrefix) {
         String projectId = projectMessageDTO.getProjectId();
         String projectType = projectMessageDTO.getType();
         //1 将指定 node 的并行度减少
@@ -417,31 +414,29 @@ public class ProjectService {
      */
     @SneakyThrows
     public void createPod(String projectId, Map<String, Integer> nodeMap, String algorithmDockerImage) {
-        V1Pod v1Pod = (V1Pod) Yaml.load(new File(podTemplateYaml));
-        V1ObjectMeta metadata = v1Pod.getMetadata();
-        V1PodSpec spec = v1Pod.getSpec();
-        List<V1Container> containers = spec.getContainers();
-        containers.forEach(container -> {
-            if ("vtd-container".equals(container.getName())) {
-                container.setName("vtd-" + projectId);
-                List<String> commandList = container.getCommand();
-                for (int i = 0; i < commandList.size(); i++) {
-                    if ("kafkaTopic".equals(commandList.get(i))) {
-                        commandList.set(i, projectId);
-                    }
-                }
-            }
-            if ("algorithm-container".equals(container.getName())) {
-                container.setName("vtd-" + projectId);
-                container.setImage(algorithmDockerImage);
-            }
-
-        });
+        String podString = FileUtil.read(new File(podTemplateYaml));
+        String replace0 = podString.replace("vtd-container", "vtd-" + projectId);
+        String replace1 = replace0.replace("algorithm-container", "algorithm-" + projectId);
+        String replace2 = replace1.replace("algorithm-image", algorithmDockerImage);
+        String podTemplateStringOfProject = replace2.replace("kafkaTopic", projectId);     // 消息主题名称为 projectId
+        String podTemplateFileNameOfProject = "project-" + projectId + ".yaml";     // 模板文件名称
+        FileUtil.writeStringToLocalFile(podTemplateStringOfProject, podYamlDirectory + podTemplateFileNameOfProject);
 
         nodeMap.forEach((nodeName, parallelism) -> {
-            String tempPodName = "project-" + projectId + StringUtil.getRandomUUID();
-            metadata.setName(tempPodName);   // pod 名称包括 projectId 和 随机字符串
-            spec.setNodeName(nodeName);          // 指定 pod 运行节点
+            String podName = "project-" + projectId + "-" + StringUtil.getRandomUUID();
+            stringRedisTemplate.opsForValue().set("pod:" + podName + ":node", nodeName);    // 将 pod 运行在哪个 node 上记录到 redis
+            String tempPodFileNameOfProject = podName + ".yaml";     // 模板文件名称
+            // -------------------------------- Comment --------------------------------
+            String tempReplace4 = podTemplateStringOfProject.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
+            String tempPodString = tempReplace4.replace("node-name", nodeName);     // 指定 pod 运行节点
+            log.info("ProjectConsumer--parseManualProject 在节点 " + nodeName + " 开始执行 pod:" + tempPodString);
+            V1Pod v1Pod;
+            try {
+                FileUtil.writeStringToLocalFile(tempPodString, podYamlDirectory + tempPodFileNameOfProject);
+                v1Pod = (V1Pod) Yaml.load(tempPodString);
+            } catch (IOException e) {
+                throw new RuntimeException("ProjectService--createPod 创建 pod 失败,项目为:" + projectId);
+            }
             //  启动
             KubernetesUtil.createPod(apiClient, kubernetesNamespace, v1Pod);
         });

+ 10 - 28
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -1,12 +1,8 @@
 package com.css.simulation.resource.scheduler.service;
 
-import api.common.pojo.constants.DictConstants;
 import api.common.util.SshUtil;
 import com.css.simulation.resource.scheduler.manager.TaskManager;
-import com.css.simulation.resource.scheduler.mapper.AutoSubProjectMapper;
-import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
@@ -33,10 +29,7 @@ public class TaskService {
     TaskManager taskManager;
     @Resource
     TaskMapper taskMapper;
-    @Resource
-    ManualProjectMapper manualProjectMapper;
-    @Resource
-    AutoSubProjectMapper autoSubProjectMapper;
+
     @Resource
     ProjectUtil projectUtil;
 
@@ -44,35 +37,24 @@ public class TaskService {
 
     @SneakyThrows
     public void taskState(String taskId, String state, String podName) {
-        SshClient sshClient = SshUtil.getClient();
-        ClientSession clientSession = SshUtil.getSession(sshClient, hostname, username, password);
         log.info("TaskService--state 接收到参数为:taskId=" + taskId + ",state=" + state + ",podName=" + podName);
         TaskPO taskPO = taskMapper.selectById(taskId);
         if (taskPO == null) {
-            log.error("TaskManager--isProjectCompleted 接收到已删除但还在执行的任务:taskId=" + taskId + ",state=" + state + ",podName=" + podName);
+//            log.error("TaskManager--isProjectCompleted 接收到已删除但还在执行的任务:taskId=" + taskId + ",state=" + state + ",podName=" + podName);
             return;
         }
-        String projectId = taskPO.getPId();
-        String projectType = null;
-
-        // 根据 projectId 获取 projectType
-        ProjectPO manualProjectPO = manualProjectMapper.selectById(projectId);
-        ProjectPO autoSubProjectPO = autoSubProjectMapper.selectById(projectId);
-        if (manualProjectPO != null) {
-            projectType = DictConstants.PROJECT_TYPE_MANUAL;
-        } else if (autoSubProjectPO != null) {
-            projectType = DictConstants.PROJECT_TYPE_AUTO_SUB;
-        }
-
-        String userId = taskPO.getCreateUserId();
-        PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaksId(userId, projectId, taskId);
+        String projectId = taskPO.getPId(); // 项目 id
+        String projectType = projectUtil.getProjectTypeByProjectId(projectId);  // 项目类型
+        String userId = taskPO.getCreateUserId();   // 用户 id
+        PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaksId(userId, projectId, taskId);  // 项目前缀
         //1 判断项目是否已完成
         boolean projectCompleted = taskManager.isProjectCompleted(redisPrefix, projectId, taskId, state, podName);
-        if (!projectCompleted) {
-            clientSession.close();
-            sshClient.stop();
+        if (!projectCompleted) { // 项目没有完成则启动下一个 pod
+            taskManager.createNextPod(projectId, podName);
             return;
         }
+        SshClient sshClient = SshUtil.getClient();
+        ClientSession clientSession = SshUtil.getSession(sshClient, hostname, username, password);
         //2 准备打分
         log.info("TaskService--taskState 项目 " + projectId + "准备打分!");
         taskManager.prepareScore(redisPrefix.getProjectRunningKey());

+ 57 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/KubernetesUtil.java

@@ -119,6 +119,63 @@ public class KubernetesUtil {
         return list.getItems().stream().map(pod -> Objects.requireNonNull(pod.getMetadata()).getName()).collect(Collectors.toList());
     }
 
+    /**
+     * 获取 pod 列表
+     *
+     * @param apiClient     api 客户端
+     * @param namespaceName namespace 名称
+     * @return pod 名称列表
+     */
+    @SneakyThrows
+    public static List<String> getPodByPrefix(ApiClient apiClient, String namespaceName, String prefix) {
+
+        CoreV1Api coreV1Api = new CoreV1Api(apiClient);
+        V1PodList v1PodList;
+        if (namespaceName == null || "".equals(namespaceName) || "default".equals(namespaceName)) {
+            v1PodList = coreV1Api.listNamespacedPod(
+                    "default",
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null);
+        } else if ("all".equals(namespaceName)) {
+            v1PodList = coreV1Api.listPodForAllNamespaces(
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null);
+        } else {
+            v1PodList = coreV1Api.listNamespacedPod(
+                    namespaceName,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null);
+        }
+        return v1PodList.getItems().stream()
+                .map(pod -> Objects.requireNonNull(pod.getMetadata()).getName())
+                .filter(Objects::nonNull)
+                .filter(podName -> podName.startsWith(prefix))
+                .collect(Collectors.toList());
+    }
 
     /**
      * 删除 pod

+ 85 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -10,6 +10,7 @@ import com.css.simulation.resource.scheduler.mapper.AutoSubProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
 import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.UserMapper;
+import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.UserPO;
 import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
@@ -45,6 +46,19 @@ public class ProjectUtil {
     @Resource
     StringRedisTemplate stringRedisTemplate;
 
+    public String getProjectTypeByProjectId(String projectId) {
+        String projectType = null;
+        ProjectPO manualProjectPO = manualProjectMapper.selectById(projectId);
+        ProjectPO autoSubProjectPO = autoSubProjectMapper.selectById(projectId);
+        if (manualProjectPO != null) {
+            projectType = DictConstants.PROJECT_TYPE_MANUAL;
+        } else if (autoSubProjectPO != null) {
+            projectType = DictConstants.PROJECT_TYPE_AUTO_SUB;
+        }
+        return projectType;
+    }
+
+
     /**
      * 获取正在运行的项目的并行度总和
      *
@@ -72,13 +86,82 @@ public class ProjectUtil {
     }
 
 
+
+
+    /**
+     * 根据并行度获取用于执行的节点列表
+     * 根据剩余可用并行度降序排序
+     *
+     * @return 节点映射(节点名,并行度)
+     */
+    public KubernetesNodeTO getNodeWithMaxRestParallelism() {
+        List<KubernetesNodeTO> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
+        List<KubernetesNodeTO> restNodeList = new ArrayList<>();    // 剩余并行度的节点列表
+        for (KubernetesNodeTO kubernetesNodeTO : initialNodeList) {
+            String name = kubernetesNodeTO.getName();
+            int maxParallelism = kubernetesNodeTO.getMaxParallelism();
+            String restParallelismKey = "node:" + name + ":parallelism";
+            String restParallelismString = stringRedisTemplate.opsForValue().get(restParallelismKey);
+            int restParallelism;
+            if (restParallelismString == null) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
+                restParallelism = maxParallelism;
+                stringRedisTemplate.opsForValue().set(restParallelismKey, restParallelism + "");
+            } else {
+                restParallelism = Integer.parseInt(restParallelismString);
+                kubernetesNodeTO.setMaxParallelism(restParallelism);
+            }
+            if (restParallelism > 0) {
+                restNodeList.add(kubernetesNodeTO);
+            }
+        }
+        int restNodeNumber = restNodeList.size();
+        if (restNodeNumber > 1) {
+            restNodeList.sort((o1, o2) -> o2.getMaxParallelism() - o1.getMaxParallelism());
+            return restNodeList.get(0);
+        } else if (restNodeNumber == 1) {
+            return restNodeList.get(0);
+        } else {
+            return null;
+        }
+    }
+
+
+    /**
+     * 根据并行度获取节点列表以及剩余可用并行度
+     * 根据剩余可用并行度降序排序
+     *
+     * @return 节点映射(节点名,并行度)
+     */
+    public Map<String, Integer> getNodeMap() {
+        List<KubernetesNodeTO> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
+        Map<String, Integer> resultNodeMap = new HashMap<>();    // 用于执行的节点映射(节点名,并行度)
+        for (KubernetesNodeTO kubernetesNodeTO : initialNodeList) {
+            String nodeName = kubernetesNodeTO.getName();
+            int maxParallelism = kubernetesNodeTO.getMaxParallelism();
+            String restParallelismKey = "node:" + nodeName + ":parallelism";
+            String restParallelismString = stringRedisTemplate.opsForValue().get(restParallelismKey);
+            int restParallelism;
+            if (restParallelismString == null) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
+                restParallelism = maxParallelism;
+                stringRedisTemplate.opsForValue().set(restParallelismKey, restParallelism + "");
+            } else {
+                restParallelism = Integer.parseInt(restParallelismString);
+                kubernetesNodeTO.setMaxParallelism(restParallelism);
+            }
+            resultNodeMap.put(nodeName,restParallelism);
+        }
+        return resultNodeMap;
+    }
+
+
+
     /**
      * 根据并行度获取用于执行的节点列表
      * 根据剩余可用并行度降序排序
      *
      * @return 节点映射(节点名,并行度)
      */
-    public Map<String, Integer> getNodeMap(long parallelism) {
+    public Map<String, Integer> getNodeMapToUse(long parallelism) {
         List<KubernetesNodeTO> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
         List<KubernetesNodeTO> restNodeList = new ArrayList<>();    // 剩余并行度的节点列表
         Map<String, Integer> resultNodeMap = new HashMap<>();    // 用于执行的节点映射(节点名,并行度)
@@ -103,7 +186,7 @@ public class ProjectUtil {
 
         for (int i = 0; i < parallelism; i++) {
             // 每次降序排序都取剩余并行度最大的一个。
-            restNodeList.sort((o1, o2) -> (int) (o2.getMaxParallelism() - o1.getMaxParallelism()));
+            restNodeList.sort((o1, o2) -> o2.getMaxParallelism() - o1.getMaxParallelism());
             KubernetesNodeTO tempNode = restNodeList.get(0);
             String tempNodeName = tempNode.getName();
             int tempParallelism = tempNode.getMaxParallelism();