martin 3 anos atrás
pai
commit
eba0afd700

+ 1 - 1
pom.xml

@@ -53,7 +53,7 @@
         <commons-pool2.version>2.11.1</commons-pool2.version>
         <minio.version>8.3.5</minio.version>
         <okhttp.version>4.9.3</okhttp.version>
-        <kubernetes.version>14.0.0</kubernetes.version>
+        <kubernetes.version>15.0.1</kubernetes.version>
         <docker-java.version>3.2.13</docker-java.version>
 
     </properties>

+ 3 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/configuration/kubernetes/KubernetesConfiguration.java

@@ -5,6 +5,7 @@ import io.kubernetes.client.util.ClientBuilder;
 import io.kubernetes.client.util.KubeConfig;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.util.ResourceUtils;
 
 import java.io.File;
 import java.io.FileReader;
@@ -15,9 +16,9 @@ public class KubernetesConfiguration {
 
     @Bean
     public ApiClient apiClient() throws IOException {
-//        File config = ResourceUtils.getFile("classpath:kubernetes/config");  // 开发环境可用,生产环境不行,无法从jar 包读取
+        File config = ResourceUtils.getFile("classpath:kubernetes/config");  // 开发环境可用,生产环境不行,无法从jar 包读取
 //        File config = new File("D:\\idea-project\\simulation-cloud\\simulation-resource-scheduler\\src\\main\\resources\\kubernetes\\config");  //windows
-        File config = new File("/root/.kube/config");   //linux
+//        File config = new File("/root/.kube/config");   //linux
 //
 //        ClassPathResource classPathResource = new ClassPathResource("kubernetes/config");
 //        InputStream inputStream = classPathResource.getInputStream();

+ 2 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -11,7 +11,6 @@ import com.css.simulation.resource.scheduler.util.MinioUtil;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import io.kubernetes.client.openapi.ApiClient;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -65,8 +64,8 @@ public class ManualProjectConsumer {
     SensorOgtMapper sensorOgtMapper;
     @Autowired
     AlgorithmMapper algorithmMapper;
-    @Autowired
-    ApiClient apiClient;
+//    @Autowired
+//    ApiClient apiClient;
     @Value("${scheduler.manual-project.topic}")
     String manualProjectTopic;
     @Value("${scheduler.manual-project.result-path-minio}")

+ 2 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/TaskController.java

@@ -2,7 +2,6 @@ package com.css.simulation.resource.scheduler.controller;
 
 
 import api.common.util.IoUtil;
-import com.css.simulation.resource.scheduler.consumer.ManualProjectConsumer;
 import com.css.simulation.resource.scheduler.service.TaskService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -25,8 +24,8 @@ public class TaskController {
     TaskService taskService;
     @Value("${hello}")
     String hello;
-    @Autowired
-    ManualProjectConsumer manualProjectConsumer;
+//    @Autowired
+//    ManualProjectConsumer manualProjectConsumer;
 
     @RequestMapping("/hello")
     public String hello() {

+ 150 - 147
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -1,55 +1,58 @@
-package com.css.simulation.resource.scheduler.scheduler;
-
-import api.common.pojo.constants.DictConstants;
-import api.common.util.*;
-import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
-import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
-import com.css.simulation.resource.scheduler.service.TaskService;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.sshd.client.SshClient;
-import org.apache.sshd.client.session.ClientSession;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.data.redis.core.StringRedisTemplate;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-@Component
-@Slf4j
-public class TickScheduler {
-
-    @Value("${scheduler.manual-project.topic}")
-    String manualProjectTopic;
-    @Autowired
-    StringRedisTemplate redisTemplate;
-
-    @Autowired
-    TaskService taskService;
-
-    @Autowired
-    TaskMapper taskMapper;
-    @Autowired
-    ProjectMapper projectMapper;
-    @Value("${scheduler.manual-project.job-yaml}")
-    String jobYaml;
-
-    @Value("${scheduler.score.hostname}")
-    String hostnameScore;
-    @Value("${scheduler.score.username}")
-    String usernameScore;
-    @Value("${scheduler.score.password}")
-    String passwordScore;
-
-
+//package com.css.simulation.resource.scheduler.scheduler;
+//
+//import api.common.pojo.constants.DictConstants;
+//import api.common.util.*;
+//import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
+//import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+//import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
+//import com.css.simulation.resource.scheduler.service.TaskService;
+//import lombok.extern.slf4j.Slf4j;
+//import org.apache.sshd.client.SshClient;
+//import org.apache.sshd.client.session.ClientSession;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.data.redis.core.StringRedisTemplate;
+//import org.springframework.scheduling.annotation.Scheduled;
+//import org.springframework.stereotype.Component;
+//
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.List;
+//import java.util.Set;
+//
+//@Component
+//@Slf4j
+//public class TickScheduler {
+//
+//    @Value("${scheduler.manual-project.topic}")
+//    String manualProjectTopic;
+//    @Autowired
+//    StringRedisTemplate redisTemplate;
+//
+//    @Autowired
+//    TaskService taskService;
+//
+//    @Autowired
+//    TaskMapper taskMapper;
+//    @Autowired
+//    ProjectMapper projectMapper;
+//    @Value("${scheduler.manual-project.job-yaml}")
+//    String jobYaml;
+//
+//    @Value("${scheduler.score.hostname}")
+//    String hostnameScore;
+//    @Value("${scheduler.score.username}")
+//    String usernameScore;
+//    @Value("${scheduler.score.password}")
+//    String passwordScore;
+//
+//
 //    @Scheduled(fixedDelay = 60 * 1000)
 //    public void retry() throws IOException {
 //
 //        //1 从 redis 获取 手动运行项目的 key 列表
+//        Set<String> keys = redisTemplate.keys("manualProject:*");
+//
 //        //2 根据 key 列表从 redis 获取 pod 列表
 //        //3 通过 kubernetes 获取 pod 列表
 //        //4 比对 redis 中的 pod 列表 和 kubernetes 中的 pod 列表,如果有 redis 中存在但 kubernetes 中不存在则准备重试
@@ -57,102 +60,102 @@ public class TickScheduler {
 //        //4-2 查看重试次数是否为 3
 //        //4-3 如果重试次数小于 3 则从 redis 获取 message 并重新发送给 kafka
 //    }
-
-    @Scheduled(fixedDelay = 60 * 1000)
-    public void tick() throws IOException {
-
-        SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
-
-        ArrayList<TaskPO> executingTaskList = taskMapper.selectExecuting();
-        if (CollectionUtil.isEmpty(executingTaskList)) {
-            //        log.info("------- TickScheduler 查询出所有执行中的任务('Running'):" + executingTaskList);
-            //2 根据 key 查出任务的心跳时间
-            for (TaskPO task : executingTaskList) {
-                String taskId = task.getId();
-                String projectId = task.getPId();
-                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick");
-//                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
-                assert s != null;
-                long tickTime = Long.parseLong(s);
-                long timeout = 2 * 60 * 1000L;
-                long now = TimeUtil.getNow();
-                long difference = now - tickTime;
-//                log.info("------- TickScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
-                if (difference > timeout) {
-                    String podName = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod");
-                    String podDeleteCommand = "kubectl delete pod " + podName;
-                    if (podName != null) {
-                        log.info("TickScheduler--tick 修改任务 " + taskId + "已超时,状态修改为 Aborted,pod 名称为:" + podName
-                                + ",并执行删除 pod 命令:" + podDeleteCommand);
-                        SshUtil.execute(session, podDeleteCommand);
-//            taskManager.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql()); // 如果任务 abort 代表项目失败
-                        taskMapper.updateFailStateWithStopTime(taskId, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_1);
-                        redisTemplate.delete("podName:" + taskId);
-                    }
-                }
-            }
-        }
-        session.close();
-        client.stop();
-    }
-
-
-    /**
-     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
-     */
-    @Scheduled(fixedDelay = 30 * 1000)
-    public void checkProject() throws IOException {
-        SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
-
-        //1 查询出正在运行中的 project
-        List<String> projectIdList = projectMapper.selectIdByState("20");
-        log.info("TickScheduler-------checkProject 查询出正在运行中的 project" + projectIdList);
-        //2 根据 projectId 获取 pod
-        projectIdList.forEach(projectId -> {
-            try {
-                String key = manualProjectTopic + ":" + projectId + ":check";
-                String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
-                String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-                log.info("TickScheduler-------checkProject 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
-                int taskNumber = StringUtil.countSubString(podList, "project");
-                if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
-                    redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
-                }
-
-                if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {
-                    // 判断两次是否超过2分钟
-                    //3 如果 pod 为空,则重启 job
-                    long lastNow = Long.parseLong(lastNowString);
-                    long now = Long.parseLong(TimeUtil.getNowString());
-
-                    if (now - lastNow > (long) 120 * 1000) {
-                        redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
-                        SshUtil.execute(session, "kubectl delete job project-" + projectId);
-                        Thread.sleep(15000);
-                        while (true) {
-                            log.info("TickScheduler-------checkProject 准备重启项目 " + projectId);
-                            String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-                            log.info("TickScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
-                            int taskNumber2 = StringUtil.countSubString(podList2, "project");
-                            if (taskNumber2 == 0) {
-                                break;
-                            }
-                        }
-                        Thread.sleep(15000);
-                        log.info("TickScheduler-------checkProject 重新执行项目" + projectId);
-                        String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
-                        SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
-                    }
-                }
-            } catch (IOException | InterruptedException e) {
-                e.printStackTrace();
-            }
-        });
-
-        session.close();
-        client.stop();
-
-    }
-}
+//
+//    @Scheduled(fixedDelay = 60 * 1000)
+//    public void tick() throws IOException {
+//
+//        SshClient client = SshUtil.getClient();
+//        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
+//
+//        ArrayList<TaskPO> executingTaskList = taskMapper.selectExecuting();
+//        if (CollectionUtil.isEmpty(executingTaskList)) {
+//            //        log.info("------- TickScheduler 查询出所有执行中的任务('Running'):" + executingTaskList);
+//            //2 根据 key 查出任务的心跳时间
+//            for (TaskPO task : executingTaskList) {
+//                String taskId = task.getId();
+//                String projectId = task.getPId();
+//                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick");
+////                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
+//                assert s != null;
+//                long tickTime = Long.parseLong(s);
+//                long timeout = 2 * 60 * 1000L;
+//                long now = TimeUtil.getNow();
+//                long difference = now - tickTime;
+////                log.info("------- TickScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
+//                if (difference > timeout) {
+//                    String podName = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod");
+//                    String podDeleteCommand = "kubectl delete pod " + podName;
+//                    if (podName != null) {
+//                        log.info("TickScheduler--tick 修改任务 " + taskId + "已超时,状态修改为 Aborted,pod 名称为:" + podName
+//                                + ",并执行删除 pod 命令:" + podDeleteCommand);
+//                        SshUtil.execute(session, podDeleteCommand);
+////            taskManager.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql()); // 如果任务 abort 代表项目失败
+//                        taskMapper.updateFailStateWithStopTime(taskId, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_1);
+//                        redisTemplate.delete("podName:" + taskId);
+//                    }
+//                }
+//            }
+//        }
+//        session.close();
+//        client.stop();
+//    }
+//
+//
+//    /**
+//     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
+//     */
+//    @Scheduled(fixedDelay = 30 * 1000)
+//    public void checkProject() throws IOException {
+//        SshClient client = SshUtil.getClient();
+//        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
+//
+//        //1 查询出正在运行中的 project
+//        List<String> projectIdList = projectMapper.selectIdByState("20");
+//        log.info("TickScheduler-------checkProject 查询出正在运行中的 project" + projectIdList);
+//        //2 根据 projectId 获取 pod
+//        projectIdList.forEach(projectId -> {
+//            try {
+//                String key = manualProjectTopic + ":" + projectId + ":check";
+//                String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
+//                String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+//                log.info("TickScheduler-------checkProject 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
+//                int taskNumber = StringUtil.countSubString(podList, "project");
+//                if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
+//                    redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
+//                }
+//
+//                if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {
+//                    // 判断两次是否超过2分钟
+//                    //3 如果 pod 为空,则重启 job
+//                    long lastNow = Long.parseLong(lastNowString);
+//                    long now = Long.parseLong(TimeUtil.getNowString());
+//
+//                    if (now - lastNow > (long) 120 * 1000) {
+//                        redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
+//                        SshUtil.execute(session, "kubectl delete job project-" + projectId);
+//                        Thread.sleep(15000);
+//                        while (true) {
+//                            log.info("TickScheduler-------checkProject 准备重启项目 " + projectId);
+//                            String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+//                            log.info("TickScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
+//                            int taskNumber2 = StringUtil.countSubString(podList2, "project");
+//                            if (taskNumber2 == 0) {
+//                                break;
+//                            }
+//                        }
+//                        Thread.sleep(15000);
+//                        log.info("TickScheduler-------checkProject 重新执行项目" + projectId);
+//                        String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
+//                        SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
+//                    }
+//                }
+//            } catch (IOException | InterruptedException e) {
+//                e.printStackTrace();
+//            }
+//        });
+//
+//        session.close();
+//        client.stop();
+//
+//    }
+//}

+ 95 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/KubernetesUtil.java

@@ -0,0 +1,95 @@
+package com.css.simulation.resource.scheduler.util;
+
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.BatchV1Api;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1Namespace;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1PodList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class KubernetesUtil {
+
+
+    /**
+     * 创建 namespace 命名空间
+     *
+     * @param apiClient     客户端
+     * @param namespaceName 命名空间名称
+     * @return 命名空间对象
+     * @throws ApiException 异常
+     */
+    public static V1Namespace createNamespace(ApiClient apiClient, String namespaceName) throws ApiException {
+        CoreV1Api coreV1Api = new CoreV1Api(apiClient);
+        V1Namespace yaml = new V1Namespace();
+        //1 apiVersion
+        yaml.setApiVersion("v1");
+
+        //2 kind
+        yaml.setKind("Namespace");
+
+        //3 metadata
+        V1ObjectMeta metadata = new V1ObjectMeta();
+        metadata.setName(namespaceName);
+        metadata.setLabels(null);
+        yaml.setMetadata(metadata);
+
+        return coreV1Api.createNamespace(yaml, null, null, null, null);
+    }
+
+    public static List<String> getPod(ApiClient apiClient, String namespaceName) throws ApiException {
+
+        CoreV1Api api = new CoreV1Api(apiClient);
+        V1PodList list;
+        if (namespaceName == null || "".equals(namespaceName)) {
+            list = api.listNamespacedPod(
+                    "default",
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null);
+        } else if ("all".equals(namespaceName)) {
+            list = api.listPodForAllNamespaces(
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null);
+        } else {
+            list = api.listNamespacedPod(
+                    namespaceName,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null);
+        }
+        return list.getItems().stream().map(pod -> Objects.requireNonNull(pod.getMetadata()).getName()).collect(Collectors.toList());
+    }
+
+
+    public static void deleteJob(ApiClient apiClient, String namespaceName, String jobName) throws ApiException {
+        BatchV1Api batchV1Api = new BatchV1Api(apiClient);
+        batchV1Api.deleteNamespacedJob(jobName, namespaceName, null, null, null, null, null, null);
+    }
+}

+ 43 - 2
simulation-resource-scheduler/src/test/java/com/css/simulation/resource/scheduler/SchedulerTest.java

@@ -6,13 +6,18 @@ import api.common.util.StringUtil;
 import api.common.util.TimeUtil;
 import com.css.simulation.resource.scheduler.mapper.IndexTemplateMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskIndexMapper;
+import com.css.simulation.resource.scheduler.mapper.TaskMapper;
 import com.css.simulation.resource.scheduler.pojo.po.IndexTemplatePO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskIndexPO;
+import com.css.simulation.resource.scheduler.util.KubernetesUtil;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import java.util.*;
@@ -26,14 +31,50 @@ public class SchedulerTest {
 
     @Autowired
     TaskIndexMapper taskIndexMapper;
-
-
+    @Autowired
+    TaskMapper taskMapper;
     @Autowired
     IndexTemplateMapper indexTemplateMapper;
+    @Autowired
+    StringRedisTemplate redisTemplate;
+    @Autowired
+    ApiClient apiClient;
 
     private final String USER_ID = "simulation-resource-scheduler";
 
 
+    @Test
+    public void redisTemplate() throws ApiException {
+        //1 从 redis 获取 手动运行项目的 key 列表
+        Set<String> keys = redisTemplate.keys("manualProject:*");
+
+        //2 根据 key 列表从 redis 获取 pod 列表
+        assert keys != null;
+        List<String> podKeyList = keys.stream().filter(key -> key.contains("pod")).collect(Collectors.toList());
+        Map<String, String> podNameMapShouldBe = new HashMap<>();
+        podKeyList.forEach(podKey -> {
+            String podName = redisTemplate.opsForValue().get(podKey);
+            podNameMapShouldBe.put(podKey, podName);
+        });
+        //3 通过 kubernetes 获取 pod 列表
+        List<String> podNameListReally = KubernetesUtil.getPod(apiClient, "all");
+        //4 比对 redis 中的 pod 列表 和 kubernetes 中的 pod 列表,如果有 redis 中存在但 kubernetes 中不存在则准备重试
+        podNameMapShouldBe.entrySet().forEach(podNameEntryShouldBe -> {
+            String podKeyShouldBe = podNameEntryShouldBe.getKey();
+            String podNameShouldBe = podNameEntryShouldBe.getValue();
+            if (!podNameListReally.contains(podNameShouldBe)) {
+
+                //4-1 根据 podKey 获取 taskId
+                String taskId = podKeyShouldBe.split(":")[2];
+                //4-2 根据 taskId 列表从 mysql 获取 taskPO 列表
+                taskMapper.selectRetryCountById
+                //4-2 查看重试次数是否为 3 ,如果重试次数小于 3 则从 redis 获取 message 并重新发送给 kafka 并重试次数 +1
+            }
+        });
+
+        System.out.println(all);
+    }
+
     @Test
     public void test() {