martin 3 anni fa
parent
commit
18fdb15a33

+ 12 - 29
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -3,7 +3,9 @@ package com.css.simulation.resource.scheduler.consumer;
 
 import api.common.pojo.constants.DictConstants;
 import api.common.pojo.dto.ProjectMessageDTO;
-import api.common.util.*;
+import api.common.util.CollectionUtil;
+import api.common.util.JsonUtil;
+import api.common.util.StringUtil;
 import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
@@ -12,8 +14,6 @@ import com.css.simulation.resource.scheduler.util.ProjectUtil;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.sshd.client.SshClient;
-import org.apache.sshd.client.session.ClientSession;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
@@ -81,7 +81,6 @@ public class ProjectConsumer {
     ProjectUtil projectUtil;
 
 
-
     /**
      * 任务运行前首先判断用户是否拥有可分配资源
      *
@@ -205,31 +204,15 @@ public class ProjectConsumer {
         // -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
         String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
-        int completions = scenePOList.size();     // 结束标
-        log.info("ProjectConsumer--parseManualProject 项目 " + projectId + " 的完成度为:" + completions);
-        log.info("ProjectConsumer--parseManualProject 项目 " + projectId + " 的并行度为:" + parallelism);
-        String jobTemplateYamlPathSource = jobTemplate + "job-template.yaml";
-        String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
-        String yamlSource = FileUtil.read(jobTemplateYamlPathSource);
-        log.info("ProjectConsumer--parseManualProject 模板文件为:" + yamlSource);
-        String replace0 = yamlSource.replace("job-cloud-simulation", "project-" + projectId);
-        String replace1 = replace0.replace("vtd-container", "vtd-" + projectId);
-        String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
-        String replace3 = replace2.replace("algorithm-image", algorithmDockerImage);
-        String replace4 = replace3.replace("projectId", projectId);
-        String replace5 = replace4.replace("completions-number", completions + "");
-        String replace6 = replace5.replace("parallelism-number", parallelism + "");
-        String replace7 = replace6.replace("apiVers1on", "apiVersion");
-        String replace8 = replace7.replace("1atch/v1", "batch/v1");
-        log.info("ProjectConsumer--parseManualProject 开始执行 yaml 文件" + replace8);
-        FileUtil.writeStringToLocalFile(replace8, jobTemplateYamlPathTarget);
-
-
-        SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, hostname, username, password);
-        SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
-        session.close();
-        client.stop();
+        projectService.transferAndRunYaml(
+                jobTemplate + "job-template.yaml",
+                projectId,
+                algorithmDockerImage,
+                scenePOList.size(),
+                parallelism,
+                jobYaml + "project-" + projectId + ".yaml"
+        );
+
     }
 
 //    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")

+ 82 - 49
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -2,7 +2,10 @@ package com.css.simulation.resource.scheduler.scheduler;
 
 import api.common.pojo.constants.DictConstants;
 import api.common.pojo.dto.ProjectMessageDTO;
-import api.common.util.*;
+import api.common.util.CollectionUtil;
+import api.common.util.JsonUtil;
+import api.common.util.StringUtil;
+import api.common.util.TimeUtil;
 import com.css.simulation.resource.scheduler.consumer.ProjectConsumer;
 import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
 import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
@@ -13,12 +16,11 @@ import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.service.TaskService;
+import com.css.simulation.resource.scheduler.util.KubernetesUtil;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
 import io.kubernetes.client.openapi.ApiClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.sshd.client.SshClient;
-import org.apache.sshd.client.session.ClientSession;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
@@ -26,7 +28,6 @@ import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -139,17 +140,13 @@ public class ProjectScheduler {
     /**
      * 处理 pod 超时
      * 同时也可处理 pod 莫名关闭,因为关闭之后也会超时
-     *
-     * @throws IOException 超时时间
      */
     @Scheduled(fixedDelay = 60 * 1000)
-    public void taskTimeout() throws IOException {
+    public void taskTimeout() {
         long timeout = 2 * 60 * 1000L;
-        SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, hostname, username, password);
         List<TaskPO> executingTaskList = taskMapper.selectByRunState(DictConstants.TASK_RUNNING);
-        log.info("ProjectScheduler--taskTimeout 正在运行的任务有:" + executingTaskList);
         if (executingTaskList != null && executingTaskList.size() > 0) {
+            log.info("ProjectScheduler--taskTimeout 正在运行的任务有:" + executingTaskList);
             for (TaskPO task : executingTaskList) {
                 String userId = task.getCreateUserId();
                 String projectId = task.getPId();
@@ -169,8 +166,6 @@ public class ProjectScheduler {
                 }
             }
         }
-        session.close();
-        client.stop();
     }
 
     /**
@@ -180,52 +175,90 @@ public class ProjectScheduler {
     @Scheduled(fixedDelay = 30 * 1000)
     @SneakyThrows
     public void projectCheck() {
-        SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, hostname, username, password);
-
         //1 查询出正在运行中的 project
         List<ProjectPO> projectIdList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
-        log.info("ProjectScheduler--projectCheck 运行中的项目有:" + projectIdList);
         //2 根据 projectId 获取 pod
         for (ProjectPO project : projectIdList) {
             String projectId = project.getId();
             String userId = project.getCreateUserId();
             PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectId(userId, projectId);
-            String checkKey = redisPrefix.getProjectCheckKey();
-            String lastNowString = redisTemplate.opsForValue().get(checkKey);
-            String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-            log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
-            int taskNumber = StringUtil.countSubString(podList, "project");
-            if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {  // 为空代表第一次,先设置时间
-                redisTemplate.opsForValue().set(checkKey, TimeUtil.getNowString());
+            String lastNowString = redisTemplate.opsForValue().get(redisPrefix.getProjectCheckKey());
+            // 获取正在运行的 pod 列表
+            List<String> podList = KubernetesUtil.getPod(apiClient, "");
+            int taskNumber = podList.size();
+            // 如果没有检查过且 pod 列表为空,则正式开始检查,设置第一次检查时间
+            if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
+                log.info("ProjectScheduler--projectCheck 开始检查项目 " + projectId);
+                redisTemplate.opsForValue().set(redisPrefix.getProjectCheckKey(), TimeUtil.getNowString());
+                return;
             }
-            if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {  // 非空则开始检查
-                // 判断两次是否超过2分钟
-                //3 如果 pod 为空,则重启 job
-                long lastNow = Long.parseLong(lastNowString);
-                long now = Long.parseLong(TimeUtil.getNowString());
-                if (now - lastNow > (long) 120 * 1000) {
-                    redisTemplate.opsForValue().set(checkKey, TimeUtil.getNowString());
-                    SshUtil.execute(session, "kubectl delete job project-" + projectId);
-                    Thread.sleep(15000);
-                    while (true) {
-                        log.info("ProjectScheduler--projectCheck 准备重启项目 " + projectId);
-                        String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-                        log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
-                        int taskNumber2 = StringUtil.countSubString(podList2, "project");
-                        if (taskNumber2 == 0) {
-                            break;
-                        }
-                    }
-                    Thread.sleep(15000);
-                    log.info("ProjectScheduler--projectCheck 重新执行项目" + projectId);
-                    String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
-                    SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
-                }
+            log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
+            //  如果两次检查时间超过了 2 分钟,且仍然没有 pod 执行,则准备重启
+            if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0 && Long.parseLong(TimeUtil.getNowString()) - Long.parseLong(lastNowString) > (long) 120 * 1000) {
+                // 删除检查
+                redisTemplate.delete(redisPrefix.getProjectCheckKey());
+                // 删除 job
+                KubernetesUtil.deleteJob(apiClient, "default", "project-" + projectId);
+                log.info("ProjectScheduler--projectCheck 重新执行项目" + projectId);
+                KubernetesUtil.applyYaml(hostname, username, password, jobYaml + "project-" + projectId + ".yaml");
             }
         }
-        session.close();
-        client.stop();
-
     }
+
+
+//    /**
+//     * 解决 pod 莫名全部关闭但是 job 还在的问题
+//     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
+//     */
+//    @Scheduled(fixedDelay = 30 * 1000)
+//    @SneakyThrows
+//    public void projectCheck() {
+//        SshClient client = SshUtil.getClient();
+//        ClientSession session = SshUtil.getSession(client, hostname, username, password);
+//
+//        //1 查询出正在运行中的 project
+//        List<ProjectPO> projectIdList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
+//        log.info("ProjectScheduler--projectCheck 运行中的项目有:" + projectIdList);
+//        //2 根据 projectId 获取 pod
+//        for (ProjectPO project : projectIdList) {
+//            String projectId = project.getId();
+//            String userId = project.getCreateUserId();
+//            PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectId(userId, projectId);
+//            String checkKey = redisPrefix.getProjectCheckKey();
+//            String lastNowString = redisTemplate.opsForValue().get(checkKey);
+//            String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+//            log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
+//            int taskNumber = StringUtil.countSubString(podList, "project");
+//            if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {  // 为空代表第一次,先设置时间
+//                redisTemplate.opsForValue().set(checkKey, TimeUtil.getNowString());
+//            }
+//            if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {  // 非空则开始检查
+//                // 判断两次是否超过2分钟
+//                //3 如果 pod 为空,则重启 job
+//                long lastNow = Long.parseLong(lastNowString);
+//                long now = Long.parseLong(TimeUtil.getNowString());
+//                if (now - lastNow > (long) 120 * 1000) {
+//                    redisTemplate.opsForValue().set(checkKey, TimeUtil.getNowString());
+//                    SshUtil.execute(session, "kubectl delete job project-" + projectId);
+//                    Thread.sleep(15000);
+//                    while (true) {
+//                        log.info("ProjectScheduler--projectCheck 准备重启项目 " + projectId);
+//                        String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+//                        log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
+//                        int taskNumber2 = StringUtil.countSubString(podList2, "project");
+//                        if (taskNumber2 == 0) {
+//                            break;
+//                        }
+//                    }
+//                    Thread.sleep(15000);
+//                    log.info("ProjectScheduler--projectCheck 重新执行项目" + projectId);
+//                    String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
+//                    SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
+//                }
+//            }
+//        }
+//        session.close();
+//        client.stop();
+//
+//    }
 }

+ 31 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -5,6 +5,7 @@ import api.common.util.*;
 import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.pojo.to.*;
+import com.css.simulation.resource.scheduler.util.KubernetesUtil;
 import com.css.simulation.resource.scheduler.util.MinioUtil;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -316,4 +317,34 @@ public class ProjectService {
         log.info("ProjectService--handleAlgorithm 项目 " + projectId + " 使用的算法镜像为:" + dockerImage);
         return dockerImage;
     }
+
+    /**
+     * 运行
+     * @param jobTemplateYamlPathSource
+     * @param projectId
+     * @param algorithmDockerImage
+     * @param completions
+     * @param parallelism
+     * @param jobTemplateYamlPathTarget
+     */
+    @SneakyThrows
+    public void transferAndRunYaml(String jobTemplateYamlPathSource, String projectId, String algorithmDockerImage, long completions, long parallelism, String jobTemplateYamlPathTarget) {
+        log.info("ProjectConsumer--transferYaml 项目 " + projectId + " 的完成度为:" + completions);
+        log.info("ProjectConsumer--transferYaml 项目 " + projectId + " 的并行度为:" + parallelism);
+        String yamlSource = FileUtil.read(jobTemplateYamlPathSource);
+        log.info("ProjectConsumer--transferYaml 模板文件为:" + yamlSource);
+        String replace0 = yamlSource.replace("job-cloud-simulation", "project-" + projectId);
+        String replace1 = replace0.replace("vtd-container", "vtd-" + projectId);
+        String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
+        String replace3 = replace2.replace("algorithm-image", algorithmDockerImage);
+        String replace4 = replace3.replace("projectId", projectId);
+        String replace5 = replace4.replace("completions-number", completions + "");
+        String replace6 = replace5.replace("parallelism-number", parallelism + "");
+        String replace7 = replace6.replace("apiVers1on", "apiVersion");
+        String replace8 = replace7.replace("1atch/v1", "batch/v1");
+        log.info("ProjectConsumer--parseManualProject 开始执行 yaml 文件" + replace8);
+        FileUtil.writeStringToLocalFile(replace8, jobTemplateYamlPathTarget);
+        //  启动
+        KubernetesUtil.applyYaml(hostname, username, password, jobTemplateYamlPathTarget);
+    }
 }

+ 13 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/KubernetesUtil.java

@@ -1,5 +1,6 @@
 package com.css.simulation.resource.scheduler.util;
 
+import api.common.util.SshUtil;
 import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.openapi.ApiException;
 import io.kubernetes.client.openapi.apis.BatchV1Api;
@@ -7,7 +8,10 @@ import io.kubernetes.client.openapi.apis.CoreV1Api;
 import io.kubernetes.client.openapi.models.V1Namespace;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1PodList;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.session.ClientSession;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
@@ -15,6 +19,14 @@ import java.util.stream.Collectors;
 public class KubernetesUtil {
 
 
+    public static void applyYaml(String hostname, String username, String password, String jobTemplateYamlPathTarget) throws IOException {
+        SshClient client = SshUtil.getClient();
+        ClientSession session = SshUtil.getSession(client, hostname, username, password);
+        SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
+        session.close();
+        client.stop();
+    }
+
     /**
      * 创建 namespace 命名空间
      *
@@ -45,7 +57,7 @@ public class KubernetesUtil {
 
         CoreV1Api api = new CoreV1Api(apiClient);
         V1PodList list;
-        if (namespaceName == null || "".equals(namespaceName)) {
+        if (namespaceName == null || "".equals(namespaceName) || "default".equals(namespaceName)) {
             list = api.listNamespacedPod(
                     "default",
                     null,

+ 1 - 1
simulation-resource-scheduler/src/main/resources/logback-spring.xml

@@ -7,7 +7,7 @@
     <springProfile name="aliyun">
         <property name="LOG_HOME" value="/opt/simulation-cloud/simulation-resource-scheduler/log"/>
     </springProfile>
-    <springProfile name="aliyun">
+    <springProfile name="test">
         <property name="LOG_HOME" value="/opt/simulation-cloud/simulation-resource-scheduler-test/log"/>
     </springProfile>