martin před 3 roky
rodič
revize
29d41b13b1

+ 2 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -186,8 +186,8 @@ public class ManualProjectConsumer {
                     taskPO.setModifyTime(TimeUtil.getNowForMysql());
                     taskPO.setIsDeleted("0");
                     taskMapper.insert(taskPO);
-                    // 心跳信息存在緩存中
-                    redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick", TimeUtil.getNowString());
+
+                    redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry", "0");
                     // 组装 task 消息
                     TaskTO taskTO = TaskTO.builder()
                             .info(InfoTO.builder()

+ 210 - 161
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -1,161 +1,210 @@
-//package com.css.simulation.resource.scheduler.scheduler;
-//
-//import api.common.pojo.constants.DictConstants;
-//import api.common.util.*;
-//import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
-//import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-//import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
-//import com.css.simulation.resource.scheduler.service.TaskService;
-//import lombok.extern.slf4j.Slf4j;
-//import org.apache.sshd.client.SshClient;
-//import org.apache.sshd.client.session.ClientSession;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.beans.factory.annotation.Value;
-//import org.springframework.data.redis.core.StringRedisTemplate;
-//import org.springframework.scheduling.annotation.Scheduled;
-//import org.springframework.stereotype.Component;
-//
-//import java.io.IOException;
-//import java.util.ArrayList;
-//import java.util.List;
-//import java.util.Set;
-//
-//@Component
-//@Slf4j
-//public class TickScheduler {
-//
-//    @Value("${scheduler.manual-project.topic}")
-//    String manualProjectTopic;
-//    @Autowired
-//    StringRedisTemplate redisTemplate;
-//
-//    @Autowired
-//    TaskService taskService;
-//
-//    @Autowired
-//    TaskMapper taskMapper;
-//    @Autowired
-//    ManualProjectMapper projectMapper;
-//    @Value("${scheduler.manual-project.job-yaml}")
-//    String jobYaml;
-//
-//    @Value("${scheduler.score.hostname}")
-//    String hostnameScore;
-//    @Value("${scheduler.score.username}")
-//    String usernameScore;
-//    @Value("${scheduler.score.password}")
-//    String passwordScore;
-//
-//
-//    @Scheduled(fixedDelay = 60 * 1000)
-//    public void retry() throws IOException {
-//
-//        //1 从 redis 获取 手动运行项目的 key 列表
-//        Set<String> keys = redisTemplate.keys("manualProject:*");
-//
-//        //2 根据 key 列表从 redis 获取 pod 列表
-//        //3 通过 kubernetes 获取 pod 列表
-//        //4 比对 redis 中的 pod 列表 和 kubernetes 中的 pod 列表,如果有 redis 中存在但 kubernetes 中不存在则准备重试
-//        //4-1 根据 key 列表从 mysql 获取任务列表
-//        //4-2 查看重试次数是否为 3
-//        //4-3 如果重试次数小于 3 则从 redis 获取 message 并重新发送给 kafka
-//    }
-//
-//    @Scheduled(fixedDelay = 60 * 1000)
-//    public void tick() throws IOException {
-//
-//        SshClient client = SshUtil.getClient();
-//        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
-//
-//        ArrayList<TaskPO> executingTaskList = taskMapper.selectExecuting();
-//        if (CollectionUtil.isEmpty(executingTaskList)) {
-//            //        log.info("------- TickScheduler 查询出所有执行中的任务('Running'):" + executingTaskList);
-//            //2 根据 key 查出任务的心跳时间
-//            for (TaskPO task : executingTaskList) {
-//                String taskId = task.getId();
-//                String projectId = task.getPId();
-//                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick");
-////                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
-//                assert s != null;
-//                long tickTime = Long.parseLong(s);
-//                long timeout = 2 * 60 * 1000L;
-//                long now = TimeUtil.getNow();
-//                long difference = now - tickTime;
-////                log.info("------- TickScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
-//                if (difference > timeout) {
-//                    String podName = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod");
-//                    String podDeleteCommand = "kubectl delete pod " + podName;
-//                    if (podName != null) {
-//                        log.info("TickScheduler--tick 修改任务 " + taskId + "已超时,状态修改为 Aborted,pod 名称为:" + podName
-//                                + ",并执行删除 pod 命令:" + podDeleteCommand);
-//                        SshUtil.execute(session, podDeleteCommand);
-////            taskManager.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql()); // 如果任务 abort 代表项目失败
-//                        taskMapper.updateFailStateWithStopTime(taskId, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_1);
-//                        redisTemplate.delete("podName:" + taskId);
-//                    }
-//                }
-//            }
-//        }
-//        session.close();
-//        client.stop();
-//    }
-//
-//
-//    /**
-//     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
-//     */
-//    @Scheduled(fixedDelay = 30 * 1000)
-//    public void checkProject() throws IOException {
-//        SshClient client = SshUtil.getClient();
-//        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
-//
-//        //1 查询出正在运行中的 project
-//        List<String> projectIdList = projectMapper.selectIdByState("20");
-//        log.info("TickScheduler-------checkProject 查询出正在运行中的 project" + projectIdList);
-//        //2 根据 projectId 获取 pod
-//        projectIdList.forEach(projectId -> {
-//            try {
-//                String key = manualProjectTopic + ":" + projectId + ":check";
-//                String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
-//                String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-//                log.info("TickScheduler-------checkProject 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
-//                int taskNumber = StringUtil.countSubString(podList, "project");
-//                if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
-//                    redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
-//                }
-//
-//                if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {
-//                    // 判断两次是否超过2分钟
-//                    //3 如果 pod 为空,则重启 job
-//                    long lastNow = Long.parseLong(lastNowString);
-//                    long now = Long.parseLong(TimeUtil.getNowString());
-//
-//                    if (now - lastNow > (long) 120 * 1000) {
-//                        redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
-//                        SshUtil.execute(session, "kubectl delete job project-" + projectId);
-//                        Thread.sleep(15000);
-//                        while (true) {
-//                            log.info("TickScheduler-------checkProject 准备重启项目 " + projectId);
-//                            String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-//                            log.info("TickScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
-//                            int taskNumber2 = StringUtil.countSubString(podList2, "project");
-//                            if (taskNumber2 == 0) {
-//                                break;
-//                            }
-//                        }
-//                        Thread.sleep(15000);
-//                        log.info("TickScheduler-------checkProject 重新执行项目" + projectId);
-//                        String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
-//                        SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
-//                    }
-//                }
-//            } catch (IOException | InterruptedException e) {
-//                e.printStackTrace();
-//            }
-//        });
-//
-//        session.close();
-//        client.stop();
-//
-//    }
-//}
+package com.css.simulation.resource.scheduler.scheduler;
+
+import api.common.pojo.constants.DictConstants;
+import api.common.util.CollectionUtil;
+import api.common.util.SshUtil;
+import api.common.util.StringUtil;
+import api.common.util.TimeUtil;
+import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
+import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
+import com.css.simulation.resource.scheduler.service.TaskService;
+import com.css.simulation.resource.scheduler.util.KubernetesUtil;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.session.ClientSession;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+@Component
+@Slf4j
+public class TickScheduler {
+
+    @Value("${scheduler.manual-project.topic}")
+    String manualProjectTopic;
+    @Autowired
+    StringRedisTemplate redisTemplate;
+
+    @Autowired
+    TaskService taskService;
+
+    @Autowired
+    TaskMapper taskMapper;
+    @Autowired
+    ManualProjectMapper projectMapper;
+    @Value("${scheduler.manual-project.job-yaml}")
+    String jobYaml;
+
+    @Value("${scheduler.score.hostname}")
+    String hostnameScore;
+    @Value("${scheduler.score.username}")
+    String usernameScore;
+    @Value("${scheduler.score.password}")
+    String passwordScore;
+
+    @Autowired
+    ApiClient apiClient;
+
+    @Autowired
+    KafkaTemplate<String, String> kafkaTemplate;
+
+
+    @Scheduled(fixedDelay = 60 * 1000)
+    public void retry() throws ApiException {
+        //1 从 redis 获取 手动运行项目的 key 列表
+        Set<String> keys = redisTemplate.keys("manualProject:*");
+
+        //2 根据 key 列表从 redis 获取 pod 列表
+        assert keys != null;
+        List<String> podKeyList = keys.stream().filter(key -> key.contains("pod")).collect(Collectors.toList());
+        Map<String, String> podNameMapShouldBe = new HashMap<>();
+        podKeyList.forEach(podKey -> {
+            String podName = redisTemplate.opsForValue().get(podKey);
+            podNameMapShouldBe.put(podKey, podName);
+        });
+        //3 通过 kubernetes 获取 pod 列表
+        List<String> podNameListReally = KubernetesUtil.getPod(apiClient, "all");
+        //4 比对 redis 中的 pod 列表 和 kubernetes 中的 pod 列表,如果有 redis 中存在但 kubernetes 中不存在则准备重试
+        podNameMapShouldBe.forEach((podKeyShouldBe, podNameShouldBe) -> {
+            if (!podNameListReally.contains(podNameShouldBe)) {
+
+                //4-1 根据 podKey 获取 projectId 和 taskId
+                String[] split = podKeyShouldBe.split(":");
+                String projectId = split[1];
+                String taskId = split[2];
+                //4-2 根据 projectId 和 taskId 列表从 redis 获取重试次数 retry
+                String retry = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry");
+                assert retry != null;
+                int retryNumber = Integer.parseInt(retry);
+                if (retryNumber < 3) {
+                    String taskJson = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":message");
+                    kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
+                        // 消息发送到的topic
+                        assert success != null;
+                        String topic = success.getRecordMetadata().topic();
+                        // 消息发送到的分区
+                        int partition = success.getRecordMetadata().partition();
+                        // 消息在分区内的offset
+                        long offset = success.getRecordMetadata().offset();
+                        log.info("------- TickScheduler--retry 发送消息成功:\n"
+                                + "主题 topic 为:" + topic + "\n"
+                                + "分区 partition 为:" + partition + "\n"
+                                + "偏移量为:" + offset + "\n"
+                                + "消息体为:" + taskJson);
+                    }, failure -> {
+                        log.error("------- ManualProjectConsumer 发送消息失败:" + failure.getMessage());
+                    });
+                }
+            }
+        });
+
+    }
+
+    @Scheduled(fixedDelay = 60 * 1000)
+    public void tick() throws IOException {
+
+        SshClient client = SshUtil.getClient();
+        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
+
+        ArrayList<TaskPO> executingTaskList = taskMapper.selectExecuting();
+        if (CollectionUtil.isEmpty(executingTaskList)) {
+            //        log.info("------- TickScheduler 查询出所有执行中的任务('Running'):" + executingTaskList);
+            //2 根据 key 查出任务的心跳时间
+            for (TaskPO task : executingTaskList) {
+                String taskId = task.getId();
+                String projectId = task.getPId();
+                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick");
+//                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
+                assert s != null;
+                long tickTime = Long.parseLong(s);
+                long timeout = 2 * 60 * 1000L;
+                long now = TimeUtil.getNow();
+                long difference = now - tickTime;
+//                log.info("------- TickScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
+                if (difference > timeout) {
+                    String podName = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod");
+                    String podDeleteCommand = "kubectl delete pod " + podName;
+                    if (podName != null) {
+                        log.info("TickScheduler--tick 修改任务 " + taskId + "已超时,状态修改为 Aborted,pod 名称为:" + podName
+                                + ",并执行删除 pod 命令:" + podDeleteCommand);
+                        SshUtil.execute(session, podDeleteCommand);
+//            taskManager.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql()); // 如果任务 abort 代表项目失败
+                        taskMapper.updateFailStateWithStopTime(taskId, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_1);
+                        redisTemplate.delete("podName:" + taskId);
+                    }
+                }
+            }
+        }
+        session.close();
+        client.stop();
+    }
+
+
+    /**
+     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
+     */
+    @Scheduled(fixedDelay = 30 * 1000)
+    public void checkProject() throws IOException {
+        SshClient client = SshUtil.getClient();
+        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
+
+        //1 查询出正在运行中的 project
+        List<String> projectIdList = projectMapper.selectIdByState("20");
+        log.info("TickScheduler-------checkProject 查询出正在运行中的 project" + projectIdList);
+        //2 根据 projectId 获取 pod
+        projectIdList.forEach(projectId -> {
+            try {
+                String key = manualProjectTopic + ":" + projectId + ":check";
+                String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
+                String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+                log.info("TickScheduler-------checkProject 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
+                int taskNumber = StringUtil.countSubString(podList, "project");
+                if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
+                    redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
+                }
+
+                if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {
+                    // 判断两次是否超过2分钟
+                    //3 如果 pod 为空,则重启 job
+                    long lastNow = Long.parseLong(lastNowString);
+                    long now = Long.parseLong(TimeUtil.getNowString());
+
+                    if (now - lastNow > (long) 120 * 1000) {
+                        redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
+                        SshUtil.execute(session, "kubectl delete job project-" + projectId);
+                        Thread.sleep(15000);
+                        while (true) {
+                            log.info("TickScheduler-------checkProject 准备重启项目 " + projectId);
+                            String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+                            log.info("TickScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
+                            int taskNumber2 = StringUtil.countSubString(podList2, "project");
+                            if (taskNumber2 == 0) {
+                                break;
+                            }
+                        }
+                        Thread.sleep(15000);
+                        log.info("TickScheduler-------checkProject 重新执行项目" + projectId);
+                        String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
+                        SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
+                    }
+                }
+            } catch (IOException | InterruptedException e) {
+                e.printStackTrace();
+            }
+        });
+
+        session.close();
+        client.stop();
+
+    }
+}

+ 0 - 28
simulation-resource-scheduler/src/test/java/com/css/simulation/resource/scheduler/SchedulerTest.java

@@ -9,7 +9,6 @@ import com.css.simulation.resource.scheduler.mapper.TaskIndexMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
 import com.css.simulation.resource.scheduler.pojo.po.IndexTemplatePO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskIndexPO;
-import com.css.simulation.resource.scheduler.util.KubernetesUtil;
 import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.openapi.ApiException;
 import lombok.extern.slf4j.Slf4j;
@@ -45,34 +44,7 @@ public class SchedulerTest {
 
     @Test
     public void redisTemplate() throws ApiException {
-        //1 从 redis 获取 手动运行项目的 key 列表
-        Set<String> keys = redisTemplate.keys("manualProject:*");
-
-        //2 根据 key 列表从 redis 获取 pod 列表
-        assert keys != null;
-        List<String> podKeyList = keys.stream().filter(key -> key.contains("pod")).collect(Collectors.toList());
-        Map<String, String> podNameMapShouldBe = new HashMap<>();
-        podKeyList.forEach(podKey -> {
-            String podName = redisTemplate.opsForValue().get(podKey);
-            podNameMapShouldBe.put(podKey, podName);
-        });
-        //3 通过 kubernetes 获取 pod 列表
-        List<String> podNameListReally = KubernetesUtil.getPod(apiClient, "all");
-        //4 比对 redis 中的 pod 列表 和 kubernetes 中的 pod 列表,如果有 redis 中存在但 kubernetes 中不存在则准备重试
-        podNameMapShouldBe.entrySet().forEach(podNameEntryShouldBe -> {
-            String podKeyShouldBe = podNameEntryShouldBe.getKey();
-            String podNameShouldBe = podNameEntryShouldBe.getValue();
-            if (!podNameListReally.contains(podNameShouldBe)) {
-
-                //4-1 根据 podKey 获取 taskId
-                String taskId = podKeyShouldBe.split(":")[2];
-                //4-2 根据 taskId 列表从 mysql 获取 taskPO 列表
-                taskMapper.selectRetryCountById
-                //4-2 查看重试次数是否为 3 ,如果重试次数小于 3 则从 redis 获取 message 并重新发送给 kafka 并重试次数 +1
-            }
-        });
 
-        System.out.println(all);
     }
 
     @Test