martin пре 3 година
родитељ
комит
5a106f3bf0

+ 28 - 27
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/configuration/kubernetes/KubernetesConfiguration.java

@@ -1,29 +1,30 @@
-package com.css.simulation.resource.scheduler.configuration.kubernetes;
-
-import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.util.ClientBuilder;
-import io.kubernetes.client.util.KubeConfig;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-
-@Configuration
-public class KubernetesConfiguration {
-
-    @Bean
-    public ApiClient apiClient() throws IOException {
+//package com.css.simulation.resource.scheduler.configuration.kubernetes;
+//
+//import io.kubernetes.client.openapi.ApiClient;
+//import io.kubernetes.client.util.ClientBuilder;
+//import io.kubernetes.client.util.KubeConfig;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.util.ResourceUtils;
+//
+//import java.io.File;
+//import java.io.FileReader;
+//import java.io.IOException;
+//
+//@Configuration
+//public class KubernetesConfiguration {
+//
+//    @Bean
+//    public ApiClient apiClient() throws IOException {
 //        File config = ResourceUtils.getFile("classpath:kubernetes/config");  // 开发环境可用,生产环境不行,无法从jar 包读取
-//        File config = new File("D:\\idea-project\\simulation-cloud\\simulation-resource-scheduler\\src\\main\\resources\\kubernetes\\config");  //windows
-        File config = new File("/root/.kube/config");   //linux
+////        File config = new File("D:\\idea-project\\simulation-cloud\\simulation-resource-scheduler\\src\\main\\resources\\kubernetes\\config");  //windows
+////        File config = new File("/root/.kube/config");   //linux
+////
+////        ClassPathResource classPathResource = new ClassPathResource("kubernetes/config");
+////        InputStream inputStream = classPathResource.getInputStream();
+////        FileUtil.writeInputStreamToLocalFile();
+//        FileReader fileReader = new FileReader(config);
+//        return ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(fileReader)).build();
+//    }
 //
-//        ClassPathResource classPathResource = new ClassPathResource("kubernetes/config");
-//        InputStream inputStream = classPathResource.getInputStream();
-//        FileUtil.writeInputStreamToLocalFile();
-        FileReader fileReader = new FileReader(config);
-        return ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(fileReader)).build();
-    }
-
-}
+//}

Разлика између датотеке није приказан због своје велике величине
+ 0 - 377
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java


Разлика између датотеке није приказан због своје велике величине
+ 375 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumerTest.java


+ 1 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/KafkaController.java

@@ -2,7 +2,6 @@ package com.css.simulation.resource.scheduler.controller;
 
 import api.common.pojo.dto.ProjectMessageDTO;
 import api.common.util.JsonUtil;
-import com.css.simulation.resource.scheduler.consumer.ManualProjectConsumer;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -19,8 +18,7 @@ public class KafkaController {
 
     @Autowired
     KafkaTemplate<String, String> kafkaTemplate;
-    @Autowired
-    ManualProjectConsumer manualProjectConsumer;
+
 
 
     @PostMapping("/hello")

+ 2 - 14
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/TaskController.java

@@ -4,7 +4,6 @@ package com.css.simulation.resource.scheduler.controller;
 import api.common.util.IoUtil;
 import com.css.simulation.resource.scheduler.service.TaskService;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.cloud.context.config.annotation.RefreshScope;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -22,15 +21,7 @@ public class TaskController {
 
     @Autowired
     TaskService taskService;
-    @Value("${hello}")
-    String hello;
-//    @Autowired
-//    ManualProjectConsumer manualProjectConsumer;
-
-    @RequestMapping("/hello")
-    public String hello() {
-        return hello;
-    }
+
 
     @RequestMapping("/download")
     public void download(
@@ -43,10 +34,7 @@ public class TaskController {
 
 
 
-//    @PostMapping("/test")
-//    public void test(@RequestBody ProjectMessageDTO projectMessageDTO) {
-//        manualProjectConsumer.parseProject1(projectMessageDTO);
-//    }
+
 
     /**
      * Pod 的心跳接口

+ 33 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/TestController.java

@@ -0,0 +1,33 @@
+package com.css.simulation.resource.scheduler.controller;
+
+
+import api.common.pojo.dto.ProjectMessageDTO;
+import com.css.simulation.resource.scheduler.consumer.ManualProjectConsumerTest;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RefreshScope
+@RestController
+@RequestMapping("/test")
+public class TestController {
+
+    @Value("${hello}")
+    String hello;
+    @Autowired
+    ManualProjectConsumerTest manualProjectConsumerTest;
+
+    @RequestMapping("/hello")
+    public String hello() {
+        return hello;
+    }
+
+    @PostMapping("/message")
+    public void test(@RequestBody ProjectMessageDTO projectMessageDTO) {
+        manualProjectConsumerTest.parseManualProjectTest(projectMessageDTO);
+    }
+}

+ 222 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -0,0 +1,222 @@
+//package com.css.simulation.resource.scheduler.scheduler;
+//
+//import api.common.pojo.constants.DictConstants;
+//import api.common.util.CollectionUtil;
+//import api.common.util.SshUtil;
+//import api.common.util.StringUtil;
+//import api.common.util.TimeUtil;
+//import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
+//import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+//import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
+//import com.css.simulation.resource.scheduler.service.TaskService;
+//import com.css.simulation.resource.scheduler.util.KubernetesUtil;
+//import io.kubernetes.client.openapi.ApiClient;
+//import io.kubernetes.client.openapi.ApiException;
+//import lombok.extern.slf4j.Slf4j;
+//import org.apache.sshd.client.SshClient;
+//import org.apache.sshd.client.session.ClientSession;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.data.redis.core.StringRedisTemplate;
+//import org.springframework.kafka.core.KafkaTemplate;
+//import org.springframework.scheduling.annotation.Scheduled;
+//import org.springframework.stereotype.Component;
+//
+//import java.io.IOException;
+//import java.util.*;
+//import java.util.stream.Collectors;
+//
+//@Component
+//@Slf4j
+//public class ProjectScheduler {
+//
+//    @Value("${scheduler.manual-project.topic}")
+//    String manualProjectTopic;
+//    @Autowired
+//    StringRedisTemplate redisTemplate;
+//
+//    @Autowired
+//    TaskService taskService;
+//
+//    @Autowired
+//    TaskMapper taskMapper;
+//    @Autowired
+//    ManualProjectMapper projectMapper;
+//    @Value("${scheduler.manual-project.job-yaml}")
+//    String jobYaml;
+//
+//    @Value("${scheduler.score.hostname}")
+//    String hostnameScore;
+//    @Value("${scheduler.score.username}")
+//    String usernameScore;
+//    @Value("${scheduler.score.password}")
+//    String passwordScore;
+//
+//    @Autowired
+//    ApiClient apiClient;
+//
+//    @Autowired
+//    KafkaTemplate<String, String> kafkaTemplate;
+//
+//
+//    /**
+//     * 解决 pod 莫名奇妙关闭的问题
+//     *
+//     * @throws ApiException 异常
+//     */
+//    @Scheduled(fixedDelay = 60 * 1000)
+//    public void retry() throws ApiException {
+//
+//
+//        log.info("------- ProjectScheduler--retry 检查 pod 是否需要重试!");
+//        //1 从 redis 获取 手动运行项目的 key 列表
+//        Set<String> keys = redisTemplate.keys("manualProject:*");
+//
+//        //2 根据 key 列表从 redis 获取 pod 列表
+//        assert keys != null;
+//        List<String> podKeyList = keys.stream().filter(key -> key.contains("pod")).collect(Collectors.toList());
+//        Map<String, String> podNameMapShouldBe = new HashMap<>();
+//        podKeyList.forEach(podKey -> {
+//            String podName = redisTemplate.opsForValue().get(podKey);
+//            podNameMapShouldBe.put(podKey, podName);
+//        });
+//        //3 通过 kubernetes 获取 pod 列表
+//        List<String> podNameListReally = KubernetesUtil.getPod(apiClient, "all");
+//        //4 比对 redis 中的 pod 列表 和 kubernetes 中的 pod 列表,如果有 redis 中存在但 kubernetes 中不存在则准备重试
+//        podNameMapShouldBe.forEach((podKeyShouldBe, podNameShouldBe) -> {
+//            if (!podNameListReally.contains(podNameShouldBe)) {
+//
+//                //4-1 根据 podKey 获取 projectId 和 taskId
+//                String[] split = podKeyShouldBe.split(":");
+//                String projectId = split[1];
+//                String taskId = split[2];
+//                //4-2 根据 projectId 和 taskId 列表从 redis 获取重试次数 retry
+//                String retry = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry");
+//                assert retry != null;
+//                int retryNumber = Integer.parseInt(retry);
+//                if (retryNumber < 3) {
+//                    log.info("------- ProjectScheduler--retry 准备第" + retryNumber + "次重试!");
+//                    String taskJson = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":message");
+//                    int finalRetryNumber = retryNumber + 1;
+//                    redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry", finalRetryNumber + "");
+//                    kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
+//                        // 消息发送到的topic
+//                        assert success != null;
+//                        String topic = success.getRecordMetadata().topic();
+//                        // 消息发送到的分区
+//                        int partition = success.getRecordMetadata().partition();
+//                        // 消息在分区内的offset
+//                        long offset = success.getRecordMetadata().offset();
+//                        log.info("------- ProjectScheduler--retry 项目 " + projectId + "的任务准备第" + finalRetryNumber + "次重试,"
+//                                + "发送消息成功:\n"
+//                                + "主题 topic 为:" + topic + "\n"
+//                                + "分区 partition 为:" + partition + "\n"
+//                                + "偏移量为:" + offset + "\n"
+//                                + "消息体为:" + taskJson);
+//                    }, failure -> {
+//                        log.error("------- ProjectScheduler--retry 发送消息失败:" + failure.getMessage());
+//                    });
+//                }
+//            }
+//        });
+//
+//    }
+//
+//    @Scheduled(fixedDelay = 60 * 1000)
+//    public void tick() throws IOException {
+//
+//        SshClient client = SshUtil.getClient();
+//        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
+//
+//        ArrayList<TaskPO> executingTaskList = taskMapper.selectExecuting();
+//        if (CollectionUtil.isEmpty(executingTaskList)) {
+//            //        log.info("------- ProjectScheduler 查询出所有执行中的任务('Running'):" + executingTaskList);
+//            //2 根据 key 查出任务的心跳时间
+//            for (TaskPO task : executingTaskList) {
+//                String taskId = task.getId();
+//                String projectId = task.getPId();
+//                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick");
+////                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
+//                assert s != null;
+//                long tickTime = Long.parseLong(s);
+//                long timeout = 2 * 60 * 1000L;
+//                long now = TimeUtil.getNow();
+//                long difference = now - tickTime;
+////                log.info("------- ProjectScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
+//                if (difference > timeout) {
+//                    String podName = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod");
+//                    String podDeleteCommand = "kubectl delete pod " + podName;
+//                    if (podName != null) {
+//                        log.info("ProjectScheduler--tick 修改任务 " + taskId + "已超时,状态修改为 Aborted,pod 名称为:" + podName
+//                                + ",并执行删除 pod 命令:" + podDeleteCommand);
+//                        SshUtil.execute(session, podDeleteCommand);
+////            taskManager.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql()); // 如果任务 abort 代表项目失败
+//                        taskMapper.updateFailStateWithStopTime(taskId, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_1);
+//                    }
+//                }
+//            }
+//        }
+//        session.close();
+//        client.stop();
+//    }
+//
+//
+//    /**
+//     * 解决 pod 莫名全部关闭但是 job 还在的问题
+//     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
+//     */
+//    @Scheduled(fixedDelay = 30 * 1000)
+//    public void checkProject() throws IOException {
+//        SshClient client = SshUtil.getClient();
+//        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
+//
+//        //1 查询出正在运行中的 project
+//        List<String> projectIdList = projectMapper.selectIdByState("20");
+//        log.info("ProjectScheduler-------checkProject 查询出正在运行中的 project" + projectIdList);
+//        //2 根据 projectId 获取 pod
+//        projectIdList.forEach(projectId -> {
+//            try {
+//                String key = manualProjectTopic + ":" + projectId + ":check";
+//                String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
+//                String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+//                log.info("ProjectScheduler-------checkProject 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
+//                int taskNumber = StringUtil.countSubString(podList, "project");
+//                if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
+//                    redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
+//                }
+//
+//                if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {
+//                    // 判断两次是否超过2分钟
+//                    //3 如果 pod 为空,则重启 job
+//                    long lastNow = Long.parseLong(lastNowString);
+//                    long now = Long.parseLong(TimeUtil.getNowString());
+//
+//                    if (now - lastNow > (long) 120 * 1000) {
+//                        redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
+//                        SshUtil.execute(session, "kubectl delete job project-" + projectId);
+//                        Thread.sleep(15000);
+//                        while (true) {
+//                            log.info("ProjectScheduler-------checkProject 准备重启项目 " + projectId);
+//                            String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+//                            log.info("ProjectScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
+//                            int taskNumber2 = StringUtil.countSubString(podList2, "project");
+//                            if (taskNumber2 == 0) {
+//                                break;
+//                            }
+//                        }
+//                        Thread.sleep(15000);
+//                        log.info("ProjectScheduler-------checkProject 重新执行项目" + projectId);
+//                        String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
+//                        SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
+//                    }
+//                }
+//            } catch (IOException | InterruptedException e) {
+//                e.printStackTrace();
+//            }
+//        });
+//
+//        session.close();
+//        client.stop();
+//
+//    }
+//}

+ 0 - 222
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -1,222 +0,0 @@
-package com.css.simulation.resource.scheduler.scheduler;
-
-import api.common.pojo.constants.DictConstants;
-import api.common.util.CollectionUtil;
-import api.common.util.SshUtil;
-import api.common.util.StringUtil;
-import api.common.util.TimeUtil;
-import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
-import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
-import com.css.simulation.resource.scheduler.service.TaskService;
-import com.css.simulation.resource.scheduler.util.KubernetesUtil;
-import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.openapi.ApiException;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.sshd.client.SshClient;
-import org.apache.sshd.client.session.ClientSession;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.data.redis.core.StringRedisTemplate;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.stream.Collectors;
-
-@Component
-@Slf4j
-public class TickScheduler {
-
-    @Value("${scheduler.manual-project.topic}")
-    String manualProjectTopic;
-    @Autowired
-    StringRedisTemplate redisTemplate;
-
-    @Autowired
-    TaskService taskService;
-
-    @Autowired
-    TaskMapper taskMapper;
-    @Autowired
-    ManualProjectMapper projectMapper;
-    @Value("${scheduler.manual-project.job-yaml}")
-    String jobYaml;
-
-    @Value("${scheduler.score.hostname}")
-    String hostnameScore;
-    @Value("${scheduler.score.username}")
-    String usernameScore;
-    @Value("${scheduler.score.password}")
-    String passwordScore;
-
-    @Autowired
-    ApiClient apiClient;
-
-    @Autowired
-    KafkaTemplate<String, String> kafkaTemplate;
-
-
-    /**
-     * 解决 pod 莫名奇妙关闭的问题
-     *
-     * @throws ApiException 异常
-     */
-    @Scheduled(fixedDelay = 60 * 1000)
-    public void retry() throws ApiException {
-
-
-        log.info("------- TickScheduler--retry 检查 pod 是否需要重试!");
-        //1 从 redis 获取 手动运行项目的 key 列表
-        Set<String> keys = redisTemplate.keys("manualProject:*");
-
-        //2 根据 key 列表从 redis 获取 pod 列表
-        assert keys != null;
-        List<String> podKeyList = keys.stream().filter(key -> key.contains("pod")).collect(Collectors.toList());
-        Map<String, String> podNameMapShouldBe = new HashMap<>();
-        podKeyList.forEach(podKey -> {
-            String podName = redisTemplate.opsForValue().get(podKey);
-            podNameMapShouldBe.put(podKey, podName);
-        });
-        //3 通过 kubernetes 获取 pod 列表
-        List<String> podNameListReally = KubernetesUtil.getPod(apiClient, "all");
-        //4 比对 redis 中的 pod 列表 和 kubernetes 中的 pod 列表,如果有 redis 中存在但 kubernetes 中不存在则准备重试
-        podNameMapShouldBe.forEach((podKeyShouldBe, podNameShouldBe) -> {
-            if (!podNameListReally.contains(podNameShouldBe)) {
-
-                //4-1 根据 podKey 获取 projectId 和 taskId
-                String[] split = podKeyShouldBe.split(":");
-                String projectId = split[1];
-                String taskId = split[2];
-                //4-2 根据 projectId 和 taskId 列表从 redis 获取重试次数 retry
-                String retry = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry");
-                assert retry != null;
-                int retryNumber = Integer.parseInt(retry);
-                if (retryNumber < 3) {
-                    log.info("------- TickScheduler--retry 准备第" + retryNumber + "次重试!");
-                    String taskJson = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":message");
-                    int finalRetryNumber = retryNumber + 1;
-                    redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry", finalRetryNumber + "");
-                    kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
-                        // 消息发送到的topic
-                        assert success != null;
-                        String topic = success.getRecordMetadata().topic();
-                        // 消息发送到的分区
-                        int partition = success.getRecordMetadata().partition();
-                        // 消息在分区内的offset
-                        long offset = success.getRecordMetadata().offset();
-                        log.info("------- TickScheduler--retry 项目 " + projectId + "的任务准备第" + finalRetryNumber + "次重试,"
-                                + "发送消息成功:\n"
-                                + "主题 topic 为:" + topic + "\n"
-                                + "分区 partition 为:" + partition + "\n"
-                                + "偏移量为:" + offset + "\n"
-                                + "消息体为:" + taskJson);
-                    }, failure -> {
-                        log.error("------- TickScheduler--retry 发送消息失败:" + failure.getMessage());
-                    });
-                }
-            }
-        });
-
-    }
-
-    @Scheduled(fixedDelay = 60 * 1000)
-    public void tick() throws IOException {
-
-        SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
-
-        ArrayList<TaskPO> executingTaskList = taskMapper.selectExecuting();
-        if (CollectionUtil.isEmpty(executingTaskList)) {
-            //        log.info("------- TickScheduler 查询出所有执行中的任务('Running'):" + executingTaskList);
-            //2 根据 key 查出任务的心跳时间
-            for (TaskPO task : executingTaskList) {
-                String taskId = task.getId();
-                String projectId = task.getPId();
-                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick");
-//                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
-                assert s != null;
-                long tickTime = Long.parseLong(s);
-                long timeout = 2 * 60 * 1000L;
-                long now = TimeUtil.getNow();
-                long difference = now - tickTime;
-//                log.info("------- TickScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
-                if (difference > timeout) {
-                    String podName = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod");
-                    String podDeleteCommand = "kubectl delete pod " + podName;
-                    if (podName != null) {
-                        log.info("TickScheduler--tick 修改任务 " + taskId + "已超时,状态修改为 Aborted,pod 名称为:" + podName
-                                + ",并执行删除 pod 命令:" + podDeleteCommand);
-                        SshUtil.execute(session, podDeleteCommand);
-//            taskManager.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql()); // 如果任务 abort 代表项目失败
-                        taskMapper.updateFailStateWithStopTime(taskId, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_1);
-                    }
-                }
-            }
-        }
-        session.close();
-        client.stop();
-    }
-
-
-    /**
-     * 解决 pod 莫名全部关闭但是 job 还在的问题
-     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
-     */
-    @Scheduled(fixedDelay = 30 * 1000)
-    public void checkProject() throws IOException {
-        SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
-
-        //1 查询出正在运行中的 project
-        List<String> projectIdList = projectMapper.selectIdByState("20");
-        log.info("TickScheduler-------checkProject 查询出正在运行中的 project" + projectIdList);
-        //2 根据 projectId 获取 pod
-        projectIdList.forEach(projectId -> {
-            try {
-                String key = manualProjectTopic + ":" + projectId + ":check";
-                String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
-                String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-                log.info("TickScheduler-------checkProject 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
-                int taskNumber = StringUtil.countSubString(podList, "project");
-                if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
-                    redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
-                }
-
-                if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {
-                    // 判断两次是否超过2分钟
-                    //3 如果 pod 为空,则重启 job
-                    long lastNow = Long.parseLong(lastNowString);
-                    long now = Long.parseLong(TimeUtil.getNowString());
-
-                    if (now - lastNow > (long) 120 * 1000) {
-                        redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
-                        SshUtil.execute(session, "kubectl delete job project-" + projectId);
-                        Thread.sleep(15000);
-                        while (true) {
-                            log.info("TickScheduler-------checkProject 准备重启项目 " + projectId);
-                            String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-                            log.info("TickScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
-                            int taskNumber2 = StringUtil.countSubString(podList2, "project");
-                            if (taskNumber2 == 0) {
-                                break;
-                            }
-                        }
-                        Thread.sleep(15000);
-                        log.info("TickScheduler-------checkProject 重新执行项目" + projectId);
-                        String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
-                        SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
-                    }
-                }
-            } catch (IOException | InterruptedException e) {
-                e.printStackTrace();
-            }
-        });
-
-        session.close();
-        client.stop();
-
-    }
-}

+ 2 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -254,7 +254,8 @@ public class TaskService {
                     } catch (Exception e) {
                         task2.setRunState(DictConstants.TASK_ABORTED);
                         taskMapper.updateFailStateWithStopTime(task2Id, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_5);
-                        throw new RuntimeException(e.getMessage());
+                        log.error(e.getMessage());
+                        return; // 如果打分失败则开始下一个打分
                     }
                     assert score != null;
                     task2.setReturnSceneId(score.getUnit_scene_ID());

Неке датотеке нису приказане због велике количине промена