martin 3 éve
szülő
commit
6323b5461d

+ 14 - 13
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/IndexTemplateMapper.java

@@ -25,21 +25,22 @@ public interface IndexTemplateMapper {
             @Result(column = "root_id", property = "rootId", jdbcType = JdbcType.VARCHAR),
             @Result(column = "package_level", property = "packageLevel", jdbcType = JdbcType.VARCHAR),
             @Result(column = "rule_name", property = "ruleName", jdbcType = JdbcType.VARCHAR),
-            @Result(column = "rule_details", property = "ruleDetails", jdbcType = JdbcType.VARCHAR),
             @Result(column = "package_and_rules", property = "ruleId", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "rule_details", property = "ruleDetails", jdbcType = JdbcType.VARCHAR)
     })
-    @Select("select sublist_id,\n" +
-            "       scene_natural_ids,\n" +
-            "       scene_traffic_ids,\n" +
-            "       scene_statue_ids,\n" +
-            "       weight,\n" +
-            "       parent_id,\n" +
-            "       root_id,\n" +
-            "       package_level,\n" +
-            "       rule_name,\n" +
-            "       package_and_rules\n" +
-            "from scene_package_sublist\n" +
-            "where root_id = #{packageId}")
+    @Select("select sps.sublist_id,\n" +
+            "       sps.scene_natural_ids,\n" +
+            "       sps.scene_traffic_ids,\n" +
+            "       sps.scene_statue_ids,\n" +
+            "       sps.weight,\n" +
+            "       sps.parent_id,\n" +
+            "       sps.root_id,\n" +
+            "       sps.package_level,\n" +
+            "       sps.rule_name,\n" +
+            "       sps.package_and_rules,\n" +
+            "       sr.rule_details\n" +
+            "from scene_package_sublist sps left join scoring_rules sr on sps.package_and_rules = sr.rules_id\n" +
+            "where sps.root_id = #{packageId}")
     List<IndexTemplatePO> selectByPackageIdIncludeDeleted(@Param("packageId") String packageId);
 
 

+ 1 - 9
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/ManualProjectMapper.java

@@ -22,6 +22,7 @@ public interface ManualProjectMapper {
             "where id = #{projectId}")
     ProjectPO selectById(@Param("projectId")String projectId);
 
+
     @Update("update simulation_manual_project\n" +
             "set now_run_state  = #{state},\n" +
             "    task_completed = '0'\n" +
@@ -46,16 +47,7 @@ public interface ManualProjectMapper {
     void updateTaskCompleted(@Param("id") String id, @Param("taskCompleted") int taskCompleted);
 
 
-    @Select("select task_number\n" +
-            "from simulation_manual_project\n" +
-            "where id = #{id}")
-    int selectTaskNumById(@Param("id") String id);
 
-    @Select("select count(1)\n" +
-            "from simulation_manual_project_task\n" +
-            "where run_state in ('Aborted', 'PendingAnalysis', 'Terminated')\n" +
-            "  and p_id = #{projectId}")
-    int selectEndTaskNum(@Param("projectId") String projectId);
 
     @Select("select *\n" +
             "from simulation_manual_project\n" +

+ 12 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskMapper.java

@@ -126,4 +126,16 @@ public interface TaskMapper {
 
     @Delete("delete from simulation_manual_project_task where p_id = #{projectId}")
     void deleteByProject(@Param("projectId") String projectId);
+
+
+    @Select("select count(1)\n" +
+            "from simulation_manual_project_task\n" +
+            "where p_id = #{projectId}")
+    int selectTaskNumByProjectId(@Param("projectId") String projectId);
+
+    @Select("select count(1)\n" +
+            "from simulation_manual_project_task\n" +
+            "where run_state in ('Aborted', 'PendingAnalysis', 'Terminated')\n" +
+            "  and p_id = #{projectId}")
+    int selectEndTaskNumByProjectId(@Param("projectId") String projectId);
 }

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/po/IndexTemplatePO.java

@@ -22,7 +22,7 @@ public class IndexTemplatePO {
     private String ruleDetails; // 打分规则代码
     private Double tempScore;
     private Integer packageLevel; // 指标等级
-    private Integer ruleId; // 指标等级
+    private String ruleId; // 指标等级
 
 
 }

+ 9 - 8
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -21,7 +21,6 @@ import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -66,9 +65,10 @@ public class ProjectScheduler {
 
         SshClient client = SshUtil.getClient();
         ClientSession session = SshUtil.getSession(client, hostname, username, password);
-        ArrayList<TaskPO> executingTaskList = taskMapper.selectExecuting();
+        List<TaskPO> executingTaskList = taskMapper.selectExecuting();
+
+        log.info("ProjectScheduler--timeout 正在运行的任务有:" + executingTaskList);
         if (CollectionUtil.isEmpty(executingTaskList)) {
-            //2 根据 key 查出任务的心跳时间
             executingTaskList.forEach(task -> {
                 String taskId = task.getId();
                 String projectId = task.getPId();
@@ -83,6 +83,7 @@ public class ProjectScheduler {
         session.close();
         client.stop();
     }
+
     /**
      * 解决 pod 莫名全部关闭但是 job 还在的问题
      * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
@@ -98,15 +99,15 @@ public class ProjectScheduler {
         //2 根据 projectId 获取 pod
         projectIdList.forEach(projectId -> {
             try {
-                String key = manualProjectTopic + ":" + projectId + ":check";
-                String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
+
+                String checkKey = manualProjectTopic + ":" + projectId + ":check";
+                String lastNowString = redisTemplate.opsForValue().get(checkKey);
                 String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
                 log.info("ProjectScheduler-------checkProject 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
                 int taskNumber = StringUtil.countSubString(podList, "project");
                 if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
-                    redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
+                    redisTemplate.opsForValue().set(checkKey, TimeUtil.getNowString());
                 }
-
                 if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {
                     // 判断两次是否超过2分钟
                     //3 如果 pod 为空,则重启 job
@@ -114,7 +115,7 @@ public class ProjectScheduler {
                     long now = Long.parseLong(TimeUtil.getNowString());
 
                     if (now - lastNow > (long) 120 * 1000) {
-                        redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
+                        redisTemplate.opsForValue().set(checkKey, TimeUtil.getNowString());
                         SshUtil.execute(session, "kubectl delete job project-" + projectId);
                         Thread.sleep(15000);
                         while (true) {

+ 8 - 6
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ManualProjectService.java

@@ -61,7 +61,10 @@ public class ManualProjectService {
     public void prepare(String manualProjectTopic, String projectId) {
 
         //1 redis 设置项目已完成任务为 0
-        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":taskCompleted", "0");
+        Set<String> oldKeys = stringRedisTemplate.keys(manualProjectTopic + ":" + projectId + "*");
+        assert oldKeys != null;
+        stringRedisTemplate.delete(oldKeys);
+        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":completed", "0");
 
         try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
             ManualProjectMapper manualProjectMapper = sqlSession.getMapper(ManualProjectMapper.class);
@@ -78,7 +81,6 @@ public class ManualProjectService {
         try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
             IndexTemplateMapper indexTemplateMapper = sqlSession.getMapper(IndexTemplateMapper.class);
             SceneMapper sceneMapper = sqlSession.getMapper(SceneMapper.class);
-            ManualProjectMapper manualProjectMapper = sqlSession.getMapper(ManualProjectMapper.class);
             //1 查询该场景包的所有指标列表存入 redis
             List<IndexTemplatePO> allIndexList = indexTemplateMapper.selectByPackageIdIncludeDeleted(packageId);
             stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + packageId + ":all", JsonUtil.listToJson(allIndexList));
@@ -109,14 +111,14 @@ public class ManualProjectService {
             return sceneList;
 
         } catch (Exception e) {
-            throw new RuntimeException("ManualProjectService--handlePackage 场景包处理出错!");
+            throw new RuntimeException("ManualProjectService--handlePackage 场景包处理出错:" + e.getMessage());
         }
 
 
     }
 
 
-    public void sendTaskMessage(String manualProjectTopic, String projectId,int maxSimulationTime, Set<ScenePO> scenePOSet, VehiclePO vehiclePO, List<CameraPO> cameraPOList, List<OgtPO> ogtPOList) {
+    public void sendTaskMessage(String manualProjectTopic, String projectId, int maxSimulationTime, Set<ScenePO> scenePOSet, VehiclePO vehiclePO, List<CameraPO> cameraPOList, List<OgtPO> ogtPOList) {
 
         try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
             IndexMapper indexMapper = sqlSession.getMapper(IndexMapper.class);
@@ -218,14 +220,14 @@ public class ManualProjectService {
                     }, failure -> {
                         log.error("------- 发送消息失败:" + failure.getMessage());
                     });
-                    messageNumber[0] = messageNumber[0] +1;
+                    messageNumber[0] = messageNumber[0] + 1;
                     stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":message", taskJson);
                 });
             });
             log.info("------- ManualProjectConsumer 共发送了" + messageNumber[0] + " 条消息!");
             sqlSession.commit();
         } catch (Exception e) {
-            throw new RuntimeException("ManualProjectService--sendTaskMessage 发送任务消息出错!");
+            throw new RuntimeException("ManualProjectService--sendTaskMessage 发送任务消息出错:" + e.getMessage());
         }
 
 

+ 4 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -143,8 +143,8 @@ public class TaskService {
             }
             SshUtil.execute(session, podDeleteCommand);
         }
-        int taskNum = manualProjectMapper.selectTaskNumById(projectId);
-        int endTaskNum = manualProjectMapper.selectEndTaskNum(projectId);    // 查询已结束的任务 'Aborted', 'PendingAnalysis', 'Terminated'
+        int taskNum = taskMapper.selectTaskNumByProjectId(projectId);
+        int endTaskNum = taskMapper.selectEndTaskNumByProjectId(projectId);    // 查询已结束的任务 'Aborted', 'PendingAnalysis', 'Terminated'
         manualProjectMapper.updateTaskCompleted(projectId, endTaskNum);
         log.info("TaskService--state 项目 " + projectId + " 完成进度为:" + endTaskNum + "/" + taskNum);
         if (taskNum != endTaskNum) {  // 已结束任务数等于所有任务数量,才会准备打分;否则退出。
@@ -329,6 +329,7 @@ public class TaskService {
             return false;
         }
         String taskJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":message");
+        log.info("TaskService--retry 重试项目 " + projectId + " 的任务 " + taskId + ",重新发送的消息为:" + taskJson);
         kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
             // 消息发送到的topic
             assert success != null;
@@ -433,9 +434,8 @@ public class TaskService {
 
     public void taskTick(String taskId) {
         log.info("TaskService--taskTick 任务 " + taskId + "心跳!");
+        String projectId = taskMapper.selectProjectIdById(taskId);
         // 刷新 redis 心跳时间
-        ProjectPO projectPO = manualProjectMapper.selectById(taskId);
-        String projectId = projectPO.getId();
         stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick", TimeUtil.getNowString());
     }