martin 3 anni fa
parent
commit
0d04711261

+ 27 - 28
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/configuration/kubernetes/KubernetesConfiguration.java

@@ -1,30 +1,29 @@
-//package com.css.simulation.resource.scheduler.configuration.kubernetes;
-//
-//import io.kubernetes.client.openapi.ApiClient;
-//import io.kubernetes.client.util.ClientBuilder;
-//import io.kubernetes.client.util.KubeConfig;
-//import org.springframework.context.annotation.Bean;
-//import org.springframework.context.annotation.Configuration;
-//import org.springframework.util.ResourceUtils;
-//
-//import java.io.File;
-//import java.io.FileReader;
-//import java.io.IOException;
-//
-//@Configuration
-//public class KubernetesConfiguration {
-//
-//    @Bean
-//    public ApiClient apiClient() throws IOException {
+package com.css.simulation.resource.scheduler.configuration.kubernetes;
+
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.util.ClientBuilder;
+import io.kubernetes.client.util.KubeConfig;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+
+@Configuration
+public class KubernetesConfiguration {
+
+    @Bean
+    public ApiClient apiClient() throws IOException {
 //        File config = ResourceUtils.getFile("classpath:kubernetes/config");  // 开发环境可用,生产环境不行,无法从jar 包读取
-////        File config = new File("D:\\idea-project\\simulation-cloud\\simulation-resource-scheduler\\src\\main\\resources\\kubernetes\\config");  //windows
-////        File config = new File("/root/.kube/config");   //linux
-////
-////        ClassPathResource classPathResource = new ClassPathResource("kubernetes/config");
-////        InputStream inputStream = classPathResource.getInputStream();
-////        FileUtil.writeInputStreamToLocalFile();
-//        FileReader fileReader = new FileReader(config);
-//        return ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(fileReader)).build();
-//    }
+//        File config = new File("D:\\idea-project\\simulation-cloud\\simulation-resource-scheduler\\src\\main\\resources\\kubernetes\\config");  //windows
+        File config = new File("/root/.kube/config");   //linux
 //
-//}
+//        ClassPathResource classPathResource = new ClassPathResource("kubernetes/config");
+//        InputStream inputStream = classPathResource.getInputStream();
+//        FileUtil.writeInputStreamToLocalFile();
+        FileReader fileReader = new FileReader(config);
+        return ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(fileReader)).build();
+    }
+
+}

File diff suppressed because it is too large
+ 0 - 377
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java


+ 1 - 7
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/TaskController.java

@@ -52,13 +52,7 @@ public class TaskController {
         taskService.taskState(taskId, state, podName);
     }
 
-//    /**
-//     * 修改任务状态
-//     */
-//    @GetMapping("/stateTest")
-//    public void taskStateTest(@RequestParam("taskId") String taskId, @RequestParam("state") String state, @RequestParam("podName") String podName) {
-//        taskService.taskStateTest(taskId, state, podName);
-//    }
+
 
     /**
      * 任务执行前调用该接口,确定该任务没有被终止

+ 3 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskMapper.java

@@ -123,4 +123,7 @@ public interface TaskMapper {
             "from simulation_manual_project_task\n" +
             "where id = #{taskId}")
     String selectProjectIdById(@Param("taskId")String taskId);
+
+    @Delete("delete from simulation_manual_project_task where p_id = #{projectId}")
+    void deleteByProject(@Param("projectId") String projectId);
 }

+ 222 - 222
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -1,222 +1,222 @@
-//package com.css.simulation.resource.scheduler.scheduler;
-//
-//import api.common.pojo.constants.DictConstants;
-//import api.common.util.CollectionUtil;
-//import api.common.util.SshUtil;
-//import api.common.util.StringUtil;
-//import api.common.util.TimeUtil;
-//import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
-//import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-//import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
-//import com.css.simulation.resource.scheduler.service.TaskService;
-//import com.css.simulation.resource.scheduler.util.KubernetesUtil;
-//import io.kubernetes.client.openapi.ApiClient;
-//import io.kubernetes.client.openapi.ApiException;
-//import lombok.extern.slf4j.Slf4j;
-//import org.apache.sshd.client.SshClient;
-//import org.apache.sshd.client.session.ClientSession;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.beans.factory.annotation.Value;
-//import org.springframework.data.redis.core.StringRedisTemplate;
-//import org.springframework.kafka.core.KafkaTemplate;
-//import org.springframework.scheduling.annotation.Scheduled;
-//import org.springframework.stereotype.Component;
-//
-//import java.io.IOException;
-//import java.util.*;
-//import java.util.stream.Collectors;
-//
-//@Component
-//@Slf4j
-//public class ProjectScheduler {
-//
-//    @Value("${scheduler.manual-project.topic}")
-//    String manualProjectTopic;
-//    @Autowired
-//    StringRedisTemplate redisTemplate;
-//
-//    @Autowired
-//    TaskService taskService;
-//
-//    @Autowired
-//    TaskMapper taskMapper;
-//    @Autowired
-//    ManualProjectMapper projectMapper;
-//    @Value("${scheduler.manual-project.job-yaml}")
-//    String jobYaml;
-//
-//    @Value("${scheduler.score.hostname}")
-//    String hostnameScore;
-//    @Value("${scheduler.score.username}")
-//    String usernameScore;
-//    @Value("${scheduler.score.password}")
-//    String passwordScore;
-//
-//    @Autowired
-//    ApiClient apiClient;
-//
-//    @Autowired
-//    KafkaTemplate<String, String> kafkaTemplate;
-//
-//
-//    /**
-//     * 解决 pod 莫名奇妙关闭的问题
-//     *
-//     * @throws ApiException 异常
-//     */
-//    @Scheduled(fixedDelay = 60 * 1000)
-//    public void retry() throws ApiException {
-//
-//
-//        log.info("------- ProjectScheduler--retry 检查 pod 是否需要重试!");
-//        //1 从 redis 获取 手动运行项目的 key 列表
-//        Set<String> keys = redisTemplate.keys("manualProject:*");
-//
-//        //2 根据 key 列表从 redis 获取 pod 列表
-//        assert keys != null;
-//        List<String> podKeyList = keys.stream().filter(key -> key.contains("pod")).collect(Collectors.toList());
-//        Map<String, String> podNameMapShouldBe = new HashMap<>();
-//        podKeyList.forEach(podKey -> {
-//            String podName = redisTemplate.opsForValue().get(podKey);
-//            podNameMapShouldBe.put(podKey, podName);
-//        });
-//        //3 通过 kubernetes 获取 pod 列表
-//        List<String> podNameListReally = KubernetesUtil.getPod(apiClient, "all");
-//        //4 比对 redis 中的 pod 列表 和 kubernetes 中的 pod 列表,如果有 redis 中存在但 kubernetes 中不存在则准备重试
-//        podNameMapShouldBe.forEach((podKeyShouldBe, podNameShouldBe) -> {
-//            if (!podNameListReally.contains(podNameShouldBe)) {
-//
-//                //4-1 根据 podKey 获取 projectId 和 taskId
-//                String[] split = podKeyShouldBe.split(":");
-//                String projectId = split[1];
-//                String taskId = split[2];
-//                //4-2 根据 projectId 和 taskId 列表从 redis 获取重试次数 retry
-//                String retry = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry");
-//                assert retry != null;
-//                int retryNumber = Integer.parseInt(retry);
-//                if (retryNumber < 3) {
-//                    log.info("------- ProjectScheduler--retry 准备第" + retryNumber + "次重试!");
-//                    String taskJson = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":message");
-//                    int finalRetryNumber = retryNumber + 1;
-//                    redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry", finalRetryNumber + "");
-//                    kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
-//                        // 消息发送到的topic
-//                        assert success != null;
-//                        String topic = success.getRecordMetadata().topic();
-//                        // 消息发送到的分区
-//                        int partition = success.getRecordMetadata().partition();
-//                        // 消息在分区内的offset
-//                        long offset = success.getRecordMetadata().offset();
-//                        log.info("------- ProjectScheduler--retry 项目 " + projectId + "的任务准备第" + finalRetryNumber + "次重试,"
-//                                + "发送消息成功:\n"
-//                                + "主题 topic 为:" + topic + "\n"
-//                                + "分区 partition 为:" + partition + "\n"
-//                                + "偏移量为:" + offset + "\n"
-//                                + "消息体为:" + taskJson);
-//                    }, failure -> {
-//                        log.error("------- ProjectScheduler--retry 发送消息失败:" + failure.getMessage());
-//                    });
-//                }
-//            }
-//        });
-//
-//    }
-//
-//    @Scheduled(fixedDelay = 60 * 1000)
-//    public void tick() throws IOException {
-//
-//        SshClient client = SshUtil.getClient();
-//        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
-//
-//        ArrayList<TaskPO> executingTaskList = taskMapper.selectExecuting();
-//        if (CollectionUtil.isEmpty(executingTaskList)) {
-//            //        log.info("------- ProjectScheduler 查询出所有执行中的任务('Running'):" + executingTaskList);
-//            //2 根据 key 查出任务的心跳时间
-//            for (TaskPO task : executingTaskList) {
-//                String taskId = task.getId();
-//                String projectId = task.getPId();
-//                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick");
-////                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
-//                assert s != null;
-//                long tickTime = Long.parseLong(s);
-//                long timeout = 2 * 60 * 1000L;
-//                long now = TimeUtil.getNow();
-//                long difference = now - tickTime;
-////                log.info("------- ProjectScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
-//                if (difference > timeout) {
-//                    String podName = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod");
-//                    String podDeleteCommand = "kubectl delete pod " + podName;
-//                    if (podName != null) {
-//                        log.info("ProjectScheduler--tick 修改任务 " + taskId + "已超时,状态修改为 Aborted,pod 名称为:" + podName
-//                                + ",并执行删除 pod 命令:" + podDeleteCommand);
-//                        SshUtil.execute(session, podDeleteCommand);
-////            taskManager.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql()); // 如果任务 abort 代表项目失败
-//                        taskMapper.updateFailStateWithStopTime(taskId, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_1);
-//                    }
-//                }
-//            }
-//        }
-//        session.close();
-//        client.stop();
-//    }
-//
-//
-//    /**
-//     * 解决 pod 莫名全部关闭但是 job 还在的问题
-//     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
-//     */
-//    @Scheduled(fixedDelay = 30 * 1000)
-//    public void checkProject() throws IOException {
-//        SshClient client = SshUtil.getClient();
-//        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
-//
-//        //1 查询出正在运行中的 project
-//        List<String> projectIdList = projectMapper.selectIdByState("20");
-//        log.info("ProjectScheduler-------checkProject 查询出正在运行中的 project" + projectIdList);
-//        //2 根据 projectId 获取 pod
-//        projectIdList.forEach(projectId -> {
-//            try {
-//                String key = manualProjectTopic + ":" + projectId + ":check";
-//                String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
-//                String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-//                log.info("ProjectScheduler-------checkProject 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
-//                int taskNumber = StringUtil.countSubString(podList, "project");
-//                if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
-//                    redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
-//                }
-//
-//                if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {
-//                    // 判断两次是否超过2分钟
-//                    //3 如果 pod 为空,则重启 job
-//                    long lastNow = Long.parseLong(lastNowString);
-//                    long now = Long.parseLong(TimeUtil.getNowString());
-//
-//                    if (now - lastNow > (long) 120 * 1000) {
-//                        redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
-//                        SshUtil.execute(session, "kubectl delete job project-" + projectId);
-//                        Thread.sleep(15000);
-//                        while (true) {
-//                            log.info("ProjectScheduler-------checkProject 准备重启项目 " + projectId);
-//                            String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-//                            log.info("ProjectScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
-//                            int taskNumber2 = StringUtil.countSubString(podList2, "project");
-//                            if (taskNumber2 == 0) {
-//                                break;
-//                            }
-//                        }
-//                        Thread.sleep(15000);
-//                        log.info("ProjectScheduler-------checkProject 重新执行项目" + projectId);
-//                        String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
-//                        SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
-//                    }
-//                }
-//            } catch (IOException | InterruptedException e) {
-//                e.printStackTrace();
-//            }
-//        });
-//
-//        session.close();
-//        client.stop();
-//
-//    }
-//}
+package com.css.simulation.resource.scheduler.scheduler;
+
+import api.common.pojo.constants.DictConstants;
+import api.common.util.CollectionUtil;
+import api.common.util.SshUtil;
+import api.common.util.StringUtil;
+import api.common.util.TimeUtil;
+import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
+import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
+import com.css.simulation.resource.scheduler.service.TaskService;
+import com.css.simulation.resource.scheduler.util.KubernetesUtil;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.session.ClientSession;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+@Component
+@Slf4j
+public class ProjectScheduler {
+
+    @Value("${scheduler.manual-project.topic}")
+    String manualProjectTopic;
+    @Autowired
+    StringRedisTemplate redisTemplate;
+
+    @Autowired
+    TaskService taskService;
+
+    @Autowired
+    TaskMapper taskMapper;
+    @Autowired
+    ManualProjectMapper projectMapper;
+    @Value("${scheduler.manual-project.job-yaml}")
+    String jobYaml;
+
+    @Value("${scheduler.score.hostname}")
+    String hostnameScore;
+    @Value("${scheduler.score.username}")
+    String usernameScore;
+    @Value("${scheduler.score.password}")
+    String passwordScore;
+
+    @Autowired
+    ApiClient apiClient;
+
+    @Autowired
+    KafkaTemplate<String, String> kafkaTemplate;
+
+
+    /**
+     * 解决 pod 莫名奇妙关闭的问题
+     *
+     * @throws ApiException 异常
+     */
+    @Scheduled(fixedDelay = 60 * 1000)
+    public void retry() throws ApiException {
+
+
+        log.info("------- ProjectScheduler--retry 检查 pod 是否需要重试!");
+        //1 从 redis 获取 手动运行项目的 key 列表
+        Set<String> keys = redisTemplate.keys("manualProject:*");
+
+        //2 根据 key 列表从 redis 获取 pod 列表
+        assert keys != null;
+        List<String> podKeyList = keys.stream().filter(key -> key.contains("pod")).collect(Collectors.toList());
+        Map<String, String> podNameMapShouldBe = new HashMap<>();
+        podKeyList.forEach(podKey -> {
+            String podName = redisTemplate.opsForValue().get(podKey);
+            podNameMapShouldBe.put(podKey, podName);
+        });
+        //3 通过 kubernetes 获取 pod 列表
+        List<String> podNameListReally = KubernetesUtil.getPod(apiClient, "all");
+        //4 比对 redis 中的 pod 列表 和 kubernetes 中的 pod 列表,如果有 redis 中存在但 kubernetes 中不存在则准备重试
+        podNameMapShouldBe.forEach((podKeyShouldBe, podNameShouldBe) -> {
+            if (!podNameListReally.contains(podNameShouldBe)) {
+
+                //4-1 根据 podKey 获取 projectId 和 taskId
+                String[] split = podKeyShouldBe.split(":");
+                String projectId = split[1];
+                String taskId = split[2];
+                //4-2 根据 projectId 和 taskId 列表从 redis 获取重试次数 retry
+                String retry = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry");
+                assert retry != null;
+                int retryNumber = Integer.parseInt(retry);
+                if (retryNumber < 3) {
+                    log.info("------- ProjectScheduler--retry 准备第" + retryNumber + "次重试!");
+                    String taskJson = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":message");
+                    int finalRetryNumber = retryNumber + 1;
+                    redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry", finalRetryNumber + "");
+                    kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
+                        // 消息发送到的topic
+                        assert success != null;
+                        String topic = success.getRecordMetadata().topic();
+                        // 消息发送到的分区
+                        int partition = success.getRecordMetadata().partition();
+                        // 消息在分区内的offset
+                        long offset = success.getRecordMetadata().offset();
+                        log.info("------- ProjectScheduler--retry 项目 " + projectId + "的任务准备第" + finalRetryNumber + "次重试,"
+                                + "发送消息成功:\n"
+                                + "主题 topic 为:" + topic + "\n"
+                                + "分区 partition 为:" + partition + "\n"
+                                + "偏移量为:" + offset + "\n"
+                                + "消息体为:" + taskJson);
+                    }, failure -> {
+                        log.error("------- ProjectScheduler--retry 发送消息失败:" + failure.getMessage());
+                    });
+                }
+            }
+        });
+
+    }
+
+    @Scheduled(fixedDelay = 60 * 1000)
+    public void tick() throws IOException {
+
+        SshClient client = SshUtil.getClient();
+        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
+
+        ArrayList<TaskPO> executingTaskList = taskMapper.selectExecuting();
+        if (CollectionUtil.isEmpty(executingTaskList)) {
+            //        log.info("------- ProjectScheduler 查询出所有执行中的任务('Running'):" + executingTaskList);
+            //2 根据 key 查出任务的心跳时间
+            for (TaskPO task : executingTaskList) {
+                String taskId = task.getId();
+                String projectId = task.getPId();
+                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick");
+//                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
+                assert s != null;
+                long tickTime = Long.parseLong(s);
+                long timeout = 2 * 60 * 1000L;
+                long now = TimeUtil.getNow();
+                long difference = now - tickTime;
+//                log.info("------- ProjectScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
+                if (difference > timeout) {
+                    String podName = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod");
+                    String podDeleteCommand = "kubectl delete pod " + podName;
+                    if (podName != null) {
+                        log.info("ProjectScheduler--tick 修改任务 " + taskId + "已超时,状态修改为 Aborted,pod 名称为:" + podName
+                                + ",并执行删除 pod 命令:" + podDeleteCommand);
+                        SshUtil.execute(session, podDeleteCommand);
+//            taskManager.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql()); // 如果任务 abort 代表项目失败
+                        taskMapper.updateFailStateWithStopTime(taskId, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_1);
+                    }
+                }
+            }
+        }
+        session.close();
+        client.stop();
+    }
+
+
+    /**
+     * 解决 pod 莫名全部关闭但是 job 还在的问题
+     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
+     */
+    @Scheduled(fixedDelay = 30 * 1000)
+    public void checkProject() throws IOException {
+        SshClient client = SshUtil.getClient();
+        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
+
+        //1 查询出正在运行中的 project
+        List<String> projectIdList = projectMapper.selectIdByState("20");
+        log.info("ProjectScheduler-------checkProject 查询出正在运行中的 project" + projectIdList);
+        //2 根据 projectId 获取 pod
+        projectIdList.forEach(projectId -> {
+            try {
+                String key = manualProjectTopic + ":" + projectId + ":check";
+                String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
+                String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+                log.info("ProjectScheduler-------checkProject 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
+                int taskNumber = StringUtil.countSubString(podList, "project");
+                if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
+                    redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
+                }
+
+                if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {
+                    // 判断两次是否超过2分钟
+                    //3 如果 pod 为空,则重启 job
+                    long lastNow = Long.parseLong(lastNowString);
+                    long now = Long.parseLong(TimeUtil.getNowString());
+
+                    if (now - lastNow > (long) 120 * 1000) {
+                        redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
+                        SshUtil.execute(session, "kubectl delete job project-" + projectId);
+                        Thread.sleep(15000);
+                        while (true) {
+                            log.info("ProjectScheduler-------checkProject 准备重启项目 " + projectId);
+                            String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+                            log.info("ProjectScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
+                            int taskNumber2 = StringUtil.countSubString(podList2, "project");
+                            if (taskNumber2 == 0) {
+                                break;
+                            }
+                        }
+                        Thread.sleep(15000);
+                        log.info("ProjectScheduler-------checkProject 重新执行项目" + projectId);
+                        String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
+                        SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
+                    }
+                }
+            } catch (IOException | InterruptedException e) {
+                e.printStackTrace();
+            }
+        });
+
+        session.close();
+        client.stop();
+
+    }
+}

+ 40 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ManualProjectService.java

@@ -0,0 +1,40 @@
+package com.css.simulation.resource.scheduler.service;
+
+import api.common.pojo.constants.DictConstants;
+import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
+import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+import org.apache.ibatis.session.ExecutorType;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ManualProjectService {
+
+    @Autowired
+    private SqlSessionFactory sqlSessionFactory;
+    @Autowired
+    ManualProjectMapper manualProjectMapper;
+    @Autowired
+    TaskMapper taskMapper;
+
+    @Autowired
+    StringRedisTemplate stringRedisTemplate;
+
+    public void prepare(String manualProjectTopic,String projectId) {
+
+        //1 redis 设置项目已完成任务为 0
+        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":taskCompleted", "0");
+
+        try(SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH,false)){
+            ManualProjectMapper manualProjectMapper = sqlSession.getMapper(ManualProjectMapper.class);
+            TaskMapper taskMapper = sqlSession.getMapper(TaskMapper.class);
+            manualProjectMapper.resetProjectState(projectId, DictConstants.PROJECT_RUNNING);   // 修改该 project 的状态为执行中,同时将已完成任务重置为 0 方便测试。
+            taskMapper.deleteByProject(projectId); // 将该 project 下所有任务删除。
+            sqlSession.commit();
+        }
+    }
+
+}

Some files were not shown because too many files changed in this diff