martin 3 年之前
父节点
当前提交
d5dcf385f7

+ 15 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -76,6 +76,18 @@ public class ManualProjectConsumer {
     String jobYaml;
 
 
+
+
+//    /**
+//     * 任务运行前首先判断用户是否可以分配
+//     *
+//     * @param projectRecord 项目启动消息
+//     */
+//    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.topic}")
+//    @SneakyThrows
+//    public void dispatchManualProject(ConsumerRecord<String, String> projectRecord) {}
+
+
     /**
      * 开始执行以及重新执行
      *
@@ -87,15 +99,16 @@ public class ManualProjectConsumer {
 
         // -------------------------------- 0 准备 --------------------------------
         log.info("------- ManualProjectConsumer 接收到项目开始消息为:" + projectRecord);
+        String projectJson = projectRecord.value();
         //1 读取 kafka 的 project 信息
-        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectRecord.value(), ProjectMessageDTO.class);
+        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
         String projectId = projectMessageDTO.getProjectId();    // 项目 id
         String packageId = projectMessageDTO.getScenePackageId();   // 场景测试包 id
         int maxSimulationTime = projectMessageDTO.getMaxSimulationTime(); // 最大仿真时间,即生成视频的时间长度
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
         //2 执行前准备,删除改项目下所有任务,即重新执行改项目时需要新的测试包
-        manualProjectService.prepare(manualProjectTopic, projectId);
+        manualProjectService.prepare(manualProjectTopic, projectId,projectJson);
         // -------------------------------- 1 查询场景 --------------------------------
         //1-1 根据场景测试包 packageId,拿到场景集合(不包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
         List<ScenePO> scenePOList = manualProjectService.handlePackage(manualProjectTopic, projectId, packageId);

+ 1 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskMapper.java

@@ -91,11 +91,9 @@ public interface TaskMapper {
     void updateSuccessStateAndScoreResultWithStopTime(@Param("task") TaskPO task, @Param("runState") String runState, @Param("runStopTime") Timestamp runStopTime);
 
     @ResultMap("task")
-    @Select("select smpt.id, smpt.p_id, smp.max_simulation_time\n" +
+    @Select("select id, p_id\n" +
             "from simulation_manual_project_task smpt\n" +
-            "         left join simulation_manual_project smp on smpt.p_id = smp.id\n" +
             "where smpt.is_deleted = '0'\n" +
-            "  and smp.is_deleted = '0'\n" +
             "  and smpt.run_state in ('Running')")
     ArrayList<TaskPO> selectExecuting();
 

+ 11 - 13
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -1,7 +1,6 @@
 package com.css.simulation.resource.scheduler.scheduler;
 
 import api.common.pojo.constants.DictConstants;
-import api.common.util.CollectionUtil;
 import api.common.util.SshUtil;
 import api.common.util.StringUtil;
 import api.common.util.TimeUtil;
@@ -59,7 +58,7 @@ public class ProjectScheduler {
      * @throws IOException 超时时间
      */
     @Scheduled(fixedDelay = 60 * 1000)
-    public void timeout() throws IOException {
+    public void taskTimeout() throws IOException {
 
         long timeout = 2 * 60 * 1000L;
 
@@ -67,9 +66,9 @@ public class ProjectScheduler {
         ClientSession session = SshUtil.getSession(client, hostname, username, password);
         List<TaskPO> executingTaskList = taskMapper.selectExecuting();
 
-        log.info("ProjectScheduler--timeout 正在运行的任务有:" + executingTaskList);
-        if (CollectionUtil.isEmpty(executingTaskList)) {
-            executingTaskList.forEach(task -> {
+        log.info("ProjectScheduler--taskTimeout 正在运行的任务有:" + executingTaskList);
+        if (executingTaskList != null && executingTaskList.size() > 0) {
+            for (TaskPO task : executingTaskList) {
                 String taskId = task.getId();
                 String projectId = task.getPId();
                 long lastTickTime = Long.parseLong(Objects.requireNonNull(redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick")));
@@ -77,8 +76,7 @@ public class ProjectScheduler {
                     String podName = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod");
                     taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
                 }
-            });
-
+            }
         }
         session.close();
         client.stop();
@@ -89,13 +87,13 @@ public class ProjectScheduler {
      * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
      */
     @Scheduled(fixedDelay = 30 * 1000)
-    public void checkProject() throws IOException {
+    public void projectCheck() throws IOException {
         SshClient client = SshUtil.getClient();
         ClientSession session = SshUtil.getSession(client, hostname, username, password);
 
         //1 查询出正在运行中的 project
         List<String> projectIdList = projectMapper.selectIdByState("20");
-        log.info("ProjectScheduler-------checkProject 查询出正在运行中的 project" + projectIdList);
+        log.info("ProjectScheduler--projectCheck 查询出正在运行中的 project" + projectIdList);
         //2 根据 projectId 获取 pod
         projectIdList.forEach(projectId -> {
             try {
@@ -103,7 +101,7 @@ public class ProjectScheduler {
                 String checkKey = manualProjectTopic + ":" + projectId + ":check";
                 String lastNowString = redisTemplate.opsForValue().get(checkKey);
                 String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-                log.info("ProjectScheduler-------checkProject 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
+                log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
                 int taskNumber = StringUtil.countSubString(podList, "project");
                 if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
                     redisTemplate.opsForValue().set(checkKey, TimeUtil.getNowString());
@@ -119,16 +117,16 @@ public class ProjectScheduler {
                         SshUtil.execute(session, "kubectl delete job project-" + projectId);
                         Thread.sleep(15000);
                         while (true) {
-                            log.info("ProjectScheduler-------checkProject 准备重启项目 " + projectId);
+                            log.info("ProjectScheduler--projectCheck 准备重启项目 " + projectId);
                             String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-                            log.info("ProjectScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
+                            log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
                             int taskNumber2 = StringUtil.countSubString(podList2, "project");
                             if (taskNumber2 == 0) {
                                 break;
                             }
                         }
                         Thread.sleep(15000);
-                        log.info("ProjectScheduler-------checkProject 重新执行项目" + projectId);
+                        log.info("ProjectScheduler--projectCheck 重新执行项目" + projectId);
                         String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
                         SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
                     }

+ 2 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ManualProjectService.java

@@ -58,13 +58,14 @@ public class ManualProjectService {
 
     // -------------------------------- Comment --------------------------------
 
-    public void prepare(String manualProjectTopic, String projectId) {
+    public void prepare(String manualProjectTopic, String projectId, String projectJson) {
 
         //1 redis 设置项目已完成任务为 0
         Set<String> oldKeys = stringRedisTemplate.keys(manualProjectTopic + ":" + projectId + "*");
         assert oldKeys != null;
         stringRedisTemplate.delete(oldKeys);
         stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":completed", "0");
+        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":start", projectJson);
 
         try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
             ManualProjectMapper manualProjectMapper = sqlSession.getMapper(ManualProjectMapper.class);

+ 11 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -135,10 +135,18 @@ public class TaskService {
                 }
                 taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), targetEvaluate);
             } else if ("Terminated".equals(state)) {
+                if (retry(projectId, taskId)) {
+                    taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
+                    return;
+                }
                 taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_3);
             } else if ("PendingAnalysis".equals(state)) {
                 taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
             } else {
+                if (retry(projectId, taskId)) {
+                    taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
+                    return;
+                }
                 taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_4);
             }
             SshUtil.execute(session, podDeleteCommand);
@@ -329,7 +337,9 @@ public class TaskService {
             return false;
         }
         String taskJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":message");
-        log.info("TaskService--retry 重试项目 " + projectId + " 的任务 " + taskId + ",重新发送的消息为:" + taskJson);
+        retry++;
+        log.info("TaskService--retry 重试项目 " + projectId + " 的任务 " + taskId + ",重试次数为:" + retry + ",重新发送的消息为:" + taskJson);
+        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry", retry + "");
         kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
             // 消息发送到的topic
             assert success != null;