martin 3 anni fa
parent
commit
09f162ccb4

+ 7 - 7
api-common/src/main/java/api/common/pojo/constants/DictConstants.java

@@ -25,13 +25,13 @@ public class DictConstants {
     public static final String SENSOR_GPS = "5"; // GPS 传感器表
 
     public static final String TASK_PENDING = "Pending"; // 任务执行状态,待执行
-    public static final String TASK_RUNNING = "Running"; // 任务执行状态,待执行
-    public static final String TASK_ABORTED = "Aborted"; // 任务执行状态,待执行
-    public static final String TASK_ANALYSIS = "Analysis"; // 任务执行状态,待执行
-    public static final String TASK_ANALYSING = "Analysing"; // 任务执行状态,待执行
-    public static final String TASK_COMPLETED = "Completed"; // 任务执行状态,待执行
-    public static final String TASK_TERMINATING = "Terminating"; // 任务执行状态,待执行
-    public static final String TASK_TERMINATED = "Terminated"; // 任务执行状态,待执行
+    public static final String TASK_RUNNING = "Running"; // 任务执行状态,运行中
+    public static final String TASK_ABORTED = "Aborted"; // 任务执行状态,中断
+    public static final String TASK_ANALYSIS = "Analysis"; // 任务执行状态,准备分析
+    public static final String TASK_ANALYSING = "Analysing"; // 任务执行状态,分析中
+    public static final String TASK_COMPLETED = "Completed"; // 任务执行状态,已完成
+    public static final String TASK_TERMINATING = "Terminating"; // 任务执行状态,终止中
+    public static final String TASK_TERMINATED = "Terminated"; // 任务执行状态,已终止
 
     public static final String REGULATION_TYPE = "regulationType"; // 法规类型
     public static final String SELF_DRIVING = "selfDriving"; // 自车驾驶行为

+ 2 - 0
api-common/src/main/java/api/common/pojo/param/RedisParameter.java

@@ -2,12 +2,14 @@ package api.common.pojo.param;
 
 
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 import javax.validation.constraints.NotBlank;
 
 @Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class RedisParameter {

+ 7 - 2
simulation-resource-common/src/main/java/com/css/simulation/resource/common/controller/RedisController.java

@@ -38,8 +38,13 @@ public class RedisController {
 
     @PostMapping("/set")
     public ResponseBodyVO<String> set(@RequestBody @Validated RedisParameter redisParameter) {
-        //2 存储键值对并设置时间。
-        redisTemplate.opsForValue().set(redisParameter.getKey(), redisParameter.getValue(), Duration.ofMinutes(redisParameter.getMinutes()));
+        if (redisParameter.getMinutes() == 0) { //2 存储键值对,永不过期。
+            redisTemplate.opsForValue().set(redisParameter.getKey(), redisParameter.getValue());
+        } else { //2 存储键值对并设置时间。
+            redisTemplate.opsForValue().set(redisParameter.getKey(), redisParameter.getValue(), Duration.ofMinutes(redisParameter.getMinutes()));
+        }
+
+
         return new ResponseBodyVO<>(ResponseBodyVO.Response.SUCCESS);
     }
 

+ 26 - 22
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -5,10 +5,8 @@ import api.common.pojo.constants.DictConstants;
 import api.common.pojo.dto.ProjectMessageDTO;
 import api.common.pojo.param.KafkaParameter;
 import api.common.pojo.param.MinioParameter;
-import api.common.util.FileUtil;
-import api.common.util.JsonUtil;
-import api.common.util.LinuxUtil;
-import api.common.util.StringUtil;
+import api.common.pojo.param.RedisParameter;
+import api.common.util.*;
 import com.css.simulation.resource.scheduler.feign.CommonService;
 import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.dto.*;
@@ -22,9 +20,11 @@ import io.kubernetes.client.util.Yaml;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ResourceUtils;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -38,6 +38,8 @@ public class ManualProjectConsumer {
     @Autowired
     ProjectMapper projectMapper;
     @Autowired
+    TaskMapper taskMapper;
+    @Autowired
     IndexMapper indexMapper;
     @Autowired
     SceneMapper sceneMapper;
@@ -53,8 +55,12 @@ public class ManualProjectConsumer {
     private AlgorithmMapper algorithmMapper;
     @Autowired
     private ApiClient apiClient;
+    @Value("${spring.kafka.consumer.topic.manual-project}")
+    private String manualProjectTopic;
+    @Value("${scheduler.manual-project.result-path}")
+    private String manualProjectResultPath;
 
-    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "test")
+    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${spring.kafka.consumer.topic.manual-project}")
     public void testConsumer(ConsumerRecord<String, String> projectRecord) {
         System.out.println("------- 消费成功:" + projectRecord.value());
     }
@@ -96,7 +102,7 @@ public class ManualProjectConsumer {
         sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
         sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
         sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
-        projectMapper.updateTaskNumber(projectId,sceneList.size()); // 有多少场景就有多少任务
+        projectMapper.updateTaskNumber(projectId, sceneList.size()); // 有多少场景就有多少任务
         // -------------------------------- 2 模型 --------------------------------
         // 根据 vehicleId, 获取 模型信息和传感器信息
         String vehicleId = projectMessageDTO.getVehicleId();    // 模型 id
@@ -110,8 +116,9 @@ public class ManualProjectConsumer {
 
         for (ScenePO scenePO : sceneList) {
             String taskId = StringUtil.getRandomUUID();
-            String resultPath = "/project/manual-project/" + projectId + "/" + taskId;
-            TaskPO taskPO = TaskPO.builder()
+            String resultPath = manualProjectResultPath + projectId + "/" + taskId;
+            // 保存任务信息
+            TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
                     .id(taskId)
                     .pId(projectId)
                     .sceneId(scenePO.getId())
@@ -120,16 +127,16 @@ public class ManualProjectConsumer {
                     .runState(DictConstants.TASK_PENDING)
                     .runResult(resultPath)
                     .build();
-//            MonitorTaskPO taskPO = TaskPO.builder()
-//                    .id(taskId)
-//                    .pId(projectId)
-//                    .sceneId(scenePO.getId())
-//                    .sceneName(scenePO.getName())
-//                    .sceneType(scenePO.getType())
-//                    .runState(DictConstants.TASK_PENDING)
-//                    .runResult(resultPath)
-//                    .build();
-            //4-1 组装 task 消息
+            taskPO.setCreateTime(TimeUtil.getNowForMysql());
+            taskPO.setCreateUserId("simulation-resource-scheduler");
+            taskPO.setModifyTime(TimeUtil.getNowForMysql());
+            taskPO.setModifyUserId("simulation-resource-scheduler");
+            taskPO.setModifyTime(TimeUtil.getNowForMysql());
+            taskPO.setIsDeleted("0");
+            taskMapper.insert(taskPO);
+            // 心跳信息存在緩存中
+            commonService.set(new RedisParameter(manualProjectTopic + ":" + projectId + ":" + taskId, TimeUtil.getNow() + "", 0));
+            // 组装 task 消息
             TaskDTO taskDTO = TaskDTO.builder()
                     .info(InfoDTO.builder()
                             .project_id(projectId)
@@ -186,12 +193,11 @@ public class ManualProjectConsumer {
         // 下载算法文件到本地( 2 到仓库服务器)
         Response response = commonService.download(new MinioParameter(minioPath));
         InputStream inputStream = response.body().asInputStream();
-        FileUtil.writeInputStreamToLocalFile(inputStream,localPath);
+        FileUtil.writeInputStreamToLocalFile(inputStream, localPath);
         //4-2 本地执行 docker load 算法文件成镜像( 2 创建 ssh 连接)
         LinuxUtil.execute("docker load");
 
 
-
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
         int completions = sceneList.size();
         int parallelism = projectMessageDTO.getParallelism();    // 并行度
@@ -233,6 +239,4 @@ public class ManualProjectConsumer {
     }
 
 
-
-
 }

+ 9 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskMapper.java

@@ -4,6 +4,7 @@ package com.css.simulation.resource.scheduler.mapper;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import org.apache.ibatis.annotations.*;
 import org.apache.ibatis.type.JdbcType;
+import org.springframework.web.bind.annotation.RequestParam;
 
 import java.sql.Timestamp;
 import java.util.List;
@@ -37,14 +38,20 @@ public interface TaskMapper {
     @Update("update monitor_task\n" +
             "set tick_time = #{tickTime}\n" +
             "where task_id = #{id}")
-    void updateTickTime(@Param("id") String id, @Param("tickTime")Timestamp tickTime);
+    void updateTickTime(@Param("id") String id, @Param("tickTime") Timestamp tickTime);
 
     @Update("update simulation_manual_project_task smpt,monitor_task mt\n" +
             "set smpt.run_state = #{tickTime},\n" +
             "    mt.run_state= #{tickTime}\n" +
             "where smpt.id = #{id}\n" +
             "  and mt.task_id = #{id}")
-    void updateState(@Param("id") String id, @Param("runState")String runState);
+    void updateState(@Param("id") String id, @Param("runState") String runState);
 
 
+    @Insert("insert into simulation_manual_project_task(id, p_id, scene_id, scene_name, scene_type,\n" +
+            "                                           run_state, run_result, create_time, create_user_id, modify_time,\n" +
+            "                                           modify_user_id, is_deleted)\n" +
+            "values (#{task.id},#{task.pId},#{task.sceneId},#{task.sceneName},#{task.sceneType},#{task.runState},\n" +
+            "#{task.runResult},#{task.createTime},#{task.createUserId},#{task.modifyTime},#{modifyUserId},#{task.isDeleted})")
+    void insert(@RequestParam("task") TaskPO task);
 }

+ 11 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -1,6 +1,7 @@
 package com.css.simulation.resource.scheduler.service;
 
 import api.common.pojo.constants.DictConstants;
+import api.common.pojo.param.RedisParameter;
 import api.common.util.CollectionUtil;
 import api.common.util.StringUtil;
 import api.common.util.TimeUtil;
@@ -13,6 +14,7 @@ import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.util.ScoreUtil;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import java.util.*;
@@ -30,11 +32,18 @@ public class TaskService {
     private TaskMapper taskMapper;
     @Autowired
     private IndexMapper indexMapper;
-
+    @Value("${spring.kafka.consumer.topic.manual-project}")
+    private String manualProjectTopic;
 
     public void taskTick(String taskId) {
-        // 刷新 mysql 心跳时间
+        // 刷新 redis 心跳时间
+        ProjectPO projectPO = projectMapper.selectById(taskId);
+        String projectId = projectPO.getId();
         taskMapper.updateTickTime(taskId, TimeUtil.getNowForMysql());
+        commonService.set(RedisParameter.builder()
+                .key(manualProjectTopic + ":" + projectId + ":" + taskId)
+                .value(TimeUtil.getNow() + "")
+                .build());
     }
 
 
@@ -92,7 +101,6 @@ public class TaskService {
     }
 
 
-
     public Boolean taskConfirm(String taskId) {
         // 将 taskId 存储到 redis,并刷新过期时间
 //        commonService.set(new RedisParameter(taskId,));