martin преди 2 години
родител
ревизия
a6aa52c8e2

+ 7 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -216,7 +216,12 @@ public class ProjectConsumer {
         Long maxSimulationTime = projectMessageDTO.getMaxSimulationTime(); // 最大仿真时间,即生成视频的时间长度
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
-        String userId = manualProjectMapper.selectCreateUserById(projectId);
+        String userId = null;
+        if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
+            userId = manualProjectMapper.selectCreateUserById(projectId);
+        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
+            userId = autoSubProjectMapper.selectCreateUserById(projectId);
+        }
         // -------------------------------- 1 查询场景 --------------------------------
         //1-1 根据场景测试包 packageId,拿到场景集合(不包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
         List<ScenePO> scenePOList = projectService.handlePackage(projectRunningPrefix, projectId, packageId);
@@ -227,7 +232,7 @@ public class ProjectConsumer {
         List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
         List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
         // -------------------------------- 3 发送任务消息 --------------------------------
-        projectService.sendTaskMessage(projectRunningPrefix, userId, projectId,projectType, maxSimulationTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
+        projectService.sendTaskMessage(projectRunningPrefix, userId, projectId, projectType, maxSimulationTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
         // -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
         String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------

+ 13 - 5
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -2,6 +2,7 @@ package com.css.simulation.resource.scheduler.manager;
 
 import api.common.pojo.constants.DictConstants;
 import api.common.util.*;
+import com.css.simulation.resource.scheduler.mapper.AutoSubProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.IndexMapper;
 import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
@@ -64,6 +65,8 @@ public class TaskManager {
     @Resource
     ManualProjectMapper manualProjectMapper;
     @Resource
+    AutoSubProjectMapper autoSubProjectMapper;
+    @Resource
     KafkaTemplate<String, String> kafkaTemplate;
     @Resource
     TaskIndexManager taskIndexManager;
@@ -155,8 +158,8 @@ public class TaskManager {
                 return false;
             }
             String taskJson = stringRedisTemplate.opsForValue().get(taskMessageKey);
+            log.info("TaskManager--retry 重试项目 " + projectId + " 的任务 " + taskId + ",重试次数为:" + retry + ",重新发送的消息为:" + taskJson);
             retry++;
-            log.info("TaskService--retry 重试项目 " + projectId + " 的任务 " + taskId + ",重试次数为:" + retry + ",重新发送的消息为:" + taskJson);
             stringRedisTemplate.opsForValue().set(taskRetryKey, retry + "");
             kafkaTemplate.send(projectId, taskJson);
             return true;
@@ -172,10 +175,15 @@ public class TaskManager {
     }
 
     @SneakyThrows
-    public void score(PrefixTO redisPrefix, String userId, String projectId, ClientSession session) {
+    public void score(PrefixTO redisPrefix, String userId, String projectId, String projectType, ClientSession session) {
         // -------------------------------- 打分 --------------------------------
-        ProjectPO projectPO = manualProjectMapper.selectById(projectId);
-        String packageId = projectPO.getScenePackageId();  // 场景测试包 id,指标根 id
+        ProjectPO projectPO = null;
+        if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
+            projectPO = manualProjectMapper.selectById(projectId);
+        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
+            projectPO = autoSubProjectMapper.selectById(projectId);
+        }
+        String packageId = projectPO.getScenePackageId();  // 场景测试包 id,指标的rootId
         List<TaskPO> taskList = taskMapper.selectTaskListByProjectId(projectId);  // 所有任务信息
         if (CollectionUtil.isEmpty(taskList)) {
             log.error("TaskManager--score 项目 " + projectId + " 下没有查询到任务!");
@@ -212,7 +220,7 @@ public class TaskManager {
             List<TaskPO> taskListOfLeafIndex = taskList.stream()
                     .filter(task -> indexId.equals(task.getLastTargetId()))
                     .collect(Collectors.toList());
-            log.info("TaskService--state 叶子节点 " + indexId + " 包括 " + taskListOfLeafIndex.size() + " 个成功运行结束任务!");
+            log.info("TaskService--state 叶子节点 " + indexId + " 包括 " + taskListOfLeafIndex.size() + " 个任务!");
             // 计算叶子指标的得分
             taskListOfLeafIndex.forEach(taskOfLeaf -> {
                 String task2Id = taskOfLeaf.getId();

+ 7 - 8
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/AutoSubProjectMapper.java

@@ -5,7 +5,6 @@ import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import org.apache.ibatis.annotations.*;
 import org.apache.ibatis.type.JdbcType;
 
-import java.sql.Timestamp;
 import java.util.List;
 
 @Mapper
@@ -18,6 +17,13 @@ public interface AutoSubProjectMapper {
             @Result(column = "parallelism", property = "parallelism", jdbcType = JdbcType.VARCHAR),
             @Result(column = "project_type", property = "projectType", jdbcType = JdbcType.VARCHAR)
     })
+    @Select("select sas.id, sap.scene, sas.create_user_id\n" +
+            "from simulation_automatic_subproject sas\n" +
+            "         left join simulation_automatic_project sap on sas.parent_id = sap.id\n" +
+            "where sas.id = #{projectId}")
+    ProjectPO selectById(@Param("projectId") String projectId);
+
+    @ResultMap("project")
     @Select("select sas.id, sap.scene, sas.create_user_id, sap.parallelism, '2' project_type\n" +
             "from simulation_automatic_subproject sas\n" +
             "         left join simulation_automatic_project sap on sas.parent_id = sap.id\n" +
@@ -36,11 +42,4 @@ public interface AutoSubProjectMapper {
             "where id = #{id}")
     void updateNowRunStateById(@Param("nowRunState") String nowRunState, @Param("id") String id);
 
-    @Update("update simulation_automatic_subproject\n" +
-            "set now_run_state  = #{nowRunState},\n" +
-            "    finish_time = #{finishTime}\n" +
-            "where id = #{id}")
-    void updateNowRunStateAndFinishTimeById(@Param("nowRunState") String state, @Param("finishTime") Timestamp finishTime, @Param("id") String id);
-
-
 }

+ 8 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/IndexMapper.java

@@ -75,18 +75,22 @@ public interface IndexMapper {
             "       or scene_statue_ids like #{idExtend}\n" +
             "       or scene_generalization_ids like #{idExtend}\n" +
             "       )")
-    List<String> selectLeafIndexIdByProjectIdAndSceneIdOfManualProject(@Param("projectId") String projectId, @Param("idExtend") String idExtend);
+    List<String> selectLeafIndexIdByManualProjectIdAndSceneId(@Param("projectId") String projectId, @Param("idExtend") String idExtend);
 
     @Select("select sublist_id\n" +
-            "from scene_package_sublist\n" +
+            "from scene_package_sublist sps\n" +
             "where is_deleted = '0'\n" +
-            "  and root_id = (select scene from simulation_automatic_project sap where id = #{projectId})\n" +
+            "  and root_id = (select sap.scene\n" +
+            "                 from simulation_automatic_project sap\n" +
+            "                 where id = (select sas.parent_id\n" +
+            "                             from simulation_automatic_subproject sas\n" +
+            "                             where id = #{projectId}))\n" +
             "  and (scene_natural_ids like #{idExtend}\n" +
             "    or scene_traffic_ids like #{idExtend}\n" +
             "    or scene_statue_ids like #{idExtend}\n" +
             "    or scene_generalization_ids like #{idExtend}\n" +
             "    )")
-    List<String> selectLeafIndexIdByProjectIdAndSceneIdOfAutoSubProject(@Param("projectId") String projectId, @Param("idExtend") String idExtend);
+    List<String> selectLeafIndexIdByAutoSubProjectIdAndSceneId(@Param("projectId") String projectId, @Param("idExtend") String idExtend);
 
 
     @Delete("delete from simulation_mpt_first_target_score where p_id = #{projectId}")

+ 2 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -170,9 +170,9 @@ public class ProjectService {
             //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
             List<String> lastTargetIdList = null;
             if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-                lastTargetIdList = indexMapper.selectLeafIndexIdByProjectIdAndSceneIdOfManualProject(projectId, "%" + sceneId + "%");
+                lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
             } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-                lastTargetIdList = indexMapper.selectLeafIndexIdByProjectIdAndSceneIdOfAutoSubProject(projectId, "%" + sceneId + "%");
+                lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
             }
             lastTargetIdList.forEach(lastTargetId -> {
                 String taskId = StringUtil.getRandomUUID();

+ 20 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -1,8 +1,12 @@
 package com.css.simulation.resource.scheduler.service;
 
+import api.common.pojo.constants.DictConstants;
 import api.common.util.SshUtil;
 import com.css.simulation.resource.scheduler.manager.TaskManager;
+import com.css.simulation.resource.scheduler.mapper.AutoSubProjectMapper;
+import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
@@ -30,6 +34,10 @@ public class TaskService {
     @Resource
     TaskMapper taskMapper;
     @Resource
+    ManualProjectMapper manualProjectMapper;
+    @Resource
+    AutoSubProjectMapper autoSubProjectMapper;
+    @Resource
     ProjectUtil projectUtil;
 
     // -------------------------------- Comment --------------------------------
@@ -45,6 +53,17 @@ public class TaskService {
             return;
         }
         String projectId = taskPO.getPId();
+        String projectType = null;
+
+        // 根据 projectId 获取 projectType
+        ProjectPO manualProjectPO = manualProjectMapper.selectById(projectId);
+        ProjectPO autoSubProjectPO = autoSubProjectMapper.selectById(projectId);
+        if (manualProjectPO != null) {
+            projectType = DictConstants.PROJECT_TYPE_MANUAL;
+        } else if (autoSubProjectPO != null) {
+            projectType = DictConstants.PROJECT_TYPE_AUTO_SUB;
+        }
+
         String userId = taskPO.getCreateUserId();
         PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaksId(userId, projectId, taskId);
         //1 判断项目是否已完成
@@ -58,7 +77,7 @@ public class TaskService {
         log.info("TaskService--taskState 项目 " + projectId + "准备打分!");
         taskManager.prepareScore(redisPrefix.getProjectRunningKey());
         //3 打分
-        taskManager.score(redisPrefix, userId, projectId, clientSession);
+        taskManager.score(redisPrefix, userId, projectId,projectType, clientSession);
         //4 结束
         taskManager.done(redisPrefix, sshClient, clientSession, projectId);
         log.info("TaskService--taskState 项目 " + projectId + " 执行完成!");

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -46,7 +46,7 @@ public class ProjectUtil {
 
     public KubernetesNodeTO getMaxParallelismNode() {
         List<KubernetesNodeTO> nodeList = kubernetesConfiguration.getNodeList();
-        log.info("ProjectUtil--getMaxParallelismNode kubernetes 节点列表为:" + nodeList);
+//        log.info("ProjectUtil--getMaxParallelismNode kubernetes 节点列表为:" + nodeList);
         String maxRestParallelismNode = "master";
         long maxRestParallelism = 0L;
         for (KubernetesNodeTO kubernetesNodeTO : nodeList) {