martin před 3 roky
rodič
revize
f548624020

+ 36 - 21
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -8,10 +8,7 @@ import api.common.util.JsonUtil;
 import api.common.util.LinuxUtil;
 import api.common.util.TimeUtil;
 import com.css.simulation.resource.scheduler.mapper.*;
-import com.css.simulation.resource.scheduler.pojo.po.CameraPO;
-import com.css.simulation.resource.scheduler.pojo.po.OgtPO;
-import com.css.simulation.resource.scheduler.pojo.po.ScenePO;
-import com.css.simulation.resource.scheduler.pojo.po.VehiclePO;
+import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.service.ManualProjectService;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -75,8 +72,6 @@ public class ManualProjectConsumer {
     String jobYaml;
 
 
-
-
     /**
      * 任务运行前首先判断用户是否拥有可分配资源
      *
@@ -95,14 +90,36 @@ public class ManualProjectConsumer {
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
         //2 根据 projectId 获取创建用户 id
-        manualProjectMapper.selectCreateUserById(projectId);
-        //3 获取用户类型(管理员账户、管理员子账户、普通账户、普通账户子账户)(独占、共享)
-        //3-1 管理员账户和管理员子账户直接执行
-        //3-2 普通独占用户,根据自己的独占节点排队
-        //3-2 普通用户
-//        if(userMapper.selectUserTypeById)
-
-        parseManualProject(projectRecord);
+        String userId = manualProjectMapper.selectCreateUserById(projectId);
+        //3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
+        UserPO userPO = userMapper.selectById(userId);
+        String roleCode = userPO.getRoleCode();
+        String useType = userPO.getUseType();
+
+
+        if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
+            parseManualProject(projectRecord);
+        } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) {
+            if (DictConstants.USE_TYPE_EXCLUSIVE.equals(useType)) {   //3-2 普通独占账户,根据自己的独占节点排队
+                // 获取独占数量
+                // 获取该用户正在运行的项目数量
+                // 获取
+            } else {
+
+            }
+            parseManualProject(projectRecord);
+        } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
+            if (DictConstants.USE_TYPE_EXCLUSIVE.equals(useType)) {   //3-2 普通独占子账户,根据自己的独占节点排队
+
+            } else {
+
+            }
+            parseManualProject(projectRecord);
+        } else {
+            parseManualProject(projectRecord);
+        }
+
+
     }
 
 
@@ -125,24 +142,22 @@ public class ManualProjectConsumer {
         int maxSimulationTime = projectMessageDTO.getMaxSimulationTime(); // 最大仿真时间,即生成视频的时间长度
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
+        String userId = manualProjectMapper.selectCreateUserById(projectId);
         //2 执行前准备,删除改项目下所有任务,即重新执行改项目时需要新的测试包
-        manualProjectService.prepare(manualProjectTopic, projectId,projectJson);
+        manualProjectService.prepare(manualProjectTopic, userId, projectId, projectJson);
         // -------------------------------- 1 查询场景 --------------------------------
         //1-1 根据场景测试包 packageId,拿到场景集合(不包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
-        List<ScenePO> scenePOList = manualProjectService.handlePackage(manualProjectTopic, projectId, packageId);
+        List<ScenePO> scenePOList = manualProjectService.handlePackage(manualProjectTopic, userId, projectId, packageId);
         Set<ScenePO> scenePOSet = new HashSet<>(scenePOList);
-        manualProjectMapper.updateTaskNumber(projectId, scenePOList.size());
-        log.info("ManualProjectService--handlePackage 项目" + projectId + " 共有 " + scenePOList.size() + " 个任务,对应 " + scenePOSet.size() + " 个场景!");
         // -------------------------------- 2 查询模型 --------------------------------
         //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
         VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
         List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
         List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
         // -------------------------------- 3 发送任务消息 --------------------------------
-        manualProjectService.sendTaskMessage(manualProjectTopic, projectId, maxSimulationTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
+        manualProjectService.sendTaskMessage(manualProjectTopic, userId, projectId, maxSimulationTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
         // -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
-        String algorithmDockerImage = manualProjectService.handleAlgorithm(manualProjectTopic, projectId, algorithmId);
-        log.info("ManualProjectService--handleAlgorithm 项目使用的算法镜像为:" + algorithmDockerImage);
+        String algorithmDockerImage = manualProjectService.handleAlgorithm(projectId, algorithmId);
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
         int completions = scenePOList.size();     // 结束标
         int parallelism = projectMessageDTO.getParallelism();    // 并行度

+ 10 - 10
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/ManualProjectMapper.java

@@ -14,7 +14,8 @@ public interface ManualProjectMapper {
 
     @Results(id = "project", value = {
             @Result(column = "id", property = "id", jdbcType = JdbcType.VARCHAR),
-            @Result(column = "scene", property = "scenePackageId", jdbcType = JdbcType.VARCHAR)
+            @Result(column = "scene", property = "scenePackageId", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "create_user_id", property = "createUserId", jdbcType = JdbcType.VARCHAR)
     })
 
     @Select("select id, scene\n" +
@@ -22,12 +23,18 @@ public interface ManualProjectMapper {
             "where id = #{projectId}")
     ProjectPO selectById(@Param("projectId")String projectId);
 
+    @Select("select id, create_user_id\n" +
+            "from simulation_manual_project\n" +
+            "where is_deleted = '0'\n" +
+            "  and now_run_state = #{nowRunState}")
+    List<ProjectPO> selectByNowRunState(@Param("nowRunState") String nowRunState);
+
 
     @Update("update simulation_manual_project\n" +
             "set now_run_state  = #{state},\n" +
             "    task_completed = '0'\n" +
             "where id = #{id}")
-    void resetProjectState(@Param("id") String id, @Param("state") String state);
+    void updateInit(@Param("id") String id, @Param("state") String state);
 
     @Update("update simulation_manual_project\n" +
             "set now_run_state  = #{state},\n" +
@@ -48,15 +55,8 @@ public interface ManualProjectMapper {
 
 
 
-
-    @Select("select id\n" +
-            "from simulation_manual_project\n" +
-            "where is_deleted = '0'\n" +
-            "  and now_run_state = #{state}")
-    List<String> selectIdByState(@Param("state") String state);
-
     @Select("select create_user_id\n" +
             "from simulation_manual_project\n" +
             "where id = #{id}")
-    void selectCreateUserById(@Param("id") String id);
+    String selectCreateUserById(@Param("id") String id);
 }

+ 17 - 12
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskMapper.java

@@ -6,7 +6,6 @@ import org.apache.ibatis.annotations.*;
 import org.apache.ibatis.type.JdbcType;
 
 import java.sql.Timestamp;
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -24,7 +23,8 @@ public interface TaskMapper {
             @Result(column = "run_state", property = "runState", jdbcType = JdbcType.VARCHAR),
             @Result(column = "run_result_file_path", property = "runResultFilePath", jdbcType = JdbcType.VARCHAR),
             @Result(column = "max_simulation_time", property = "maxSimulationTime", jdbcType = JdbcType.VARCHAR),
-            @Result(column = "last_targer_id", property = "lastTargetId", jdbcType = JdbcType.VARCHAR)
+            @Result(column = "last_targer_id", property = "lastTargetId", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "create_user_id", property = "lastTargetId", jdbcType = JdbcType.VARCHAR)
     })
     @Select("select id,\n" +
             "       p_id,\n" +
@@ -39,6 +39,20 @@ public interface TaskMapper {
     List<TaskPO> selectTaskListByProjectId(@Param("projectId") String projectId);
 
 
+    @Select("select p_id, create_user_id\n" +
+            "from simulation_manual_project_task\n" +
+            "where id = #{taskId}")
+    TaskPO selectById(@Param("taskId")String taskId);
+
+    @ResultMap("task")
+    @Select("select id, p_id, create_user_id\n" +
+            "from simulation_manual_project_task\n" +
+            "where is_deleted = '0'\n" +
+            "  and run_state = 'Running'")
+    List<TaskPO> selectByRunState(@Param("runState") String runState);
+
+
+
     @Update("update simulation_manual_project\n" +
             "set now_run_state = #{state}\n" +
             "where id = (select p_id from simulation_manual_project_task where id = #{taskId})")
@@ -90,12 +104,7 @@ public interface TaskMapper {
             "  where id = #{task.id}")
     void updateSuccessStateAndScoreResultWithStopTime(@Param("task") TaskPO task, @Param("runState") String runState, @Param("runStopTime") Timestamp runStopTime);
 
-    @ResultMap("task")
-    @Select("select id, p_id\n" +
-            "from simulation_manual_project_task smpt\n" +
-            "where smpt.is_deleted = '0'\n" +
-            "  and smpt.run_state in ('Running')")
-    ArrayList<TaskPO> selectExecuting();
+
 
     @Select("select id\n" +
             "from simulation_manual_project_task\n" +
@@ -117,10 +126,6 @@ public interface TaskMapper {
             "where id = #{taskId}")
     void updateStateById(@Param("runState") String runState, @Param("taskId") String taskId);
 
-    @Select("select p_id\n" +
-            "from simulation_manual_project_task\n" +
-            "where id = #{taskId}")
-    String selectProjectIdById(@Param("taskId")String taskId);
 
     @Delete("delete from simulation_manual_project_task where p_id = #{projectId}")
     void deleteByProject(@Param("projectId") String projectId);

+ 10 - 48
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/UserMapper.java

@@ -1,62 +1,24 @@
 package com.css.simulation.resource.scheduler.mapper;
 
 
-import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
+import com.css.simulation.resource.scheduler.pojo.po.UserPO;
 import org.apache.ibatis.annotations.*;
 import org.apache.ibatis.type.JdbcType;
 
-import java.sql.Timestamp;
-import java.util.List;
-
 @Mapper
 public interface UserMapper {
 
 
-    @Results(id = "project", value = {
+    @Results(id = "user", value = {
             @Result(column = "id", property = "id", jdbcType = JdbcType.VARCHAR),
-            @Result(column = "scene", property = "scenePackageId", jdbcType = JdbcType.VARCHAR)
+            @Result(column = "role_code", property = "roleCode", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "use_type", property = "useType", jdbcType = JdbcType.VARCHAR)
     })
+    @Select("select id,\n" +
+            "       role_code,\n" +
+            "       use_type\n" +
+            "from system_user\n" +
+            "where id = #{userId}")
+    UserPO selectById(@Param("userId")String userId);
 
-    @Select("select id, scene\n" +
-            "from simulation_manual_project\n" +
-            "where id = #{projectId}")
-    ProjectPO selectById(@Param("projectId")String projectId);
-
-
-    @Update("update simulation_manual_project\n" +
-            "set now_run_state  = #{state},\n" +
-            "    task_completed = '0'\n" +
-            "where id = #{id}")
-    void resetProjectState(@Param("id") String id, @Param("state") String state);
-
-    @Update("update simulation_manual_project\n" +
-            "set now_run_state  = #{state},\n" +
-            "    finish_time = #{finishTime}\n" +
-            "where id = #{id}")
-    void updateProjectState(@Param("id") String id, @Param("state") String state,@Param("finishTime") Timestamp finishTime);
-
-
-    @Update("update simulation_manual_project\n" +
-            "set task_number = #{taskNumber}, task_completed = 0\n" +
-            "where id = #{id}")
-    void updateTaskNumber(@Param("id") String id, @Param("taskNumber") int taskNumber);
-
-    @Update("update simulation_manual_project\n" +
-            "set task_completed = #{taskCompleted}\n" +
-            "where id = #{id}")
-    void updateTaskCompleted(@Param("id") String id, @Param("taskCompleted") int taskCompleted);
-
-
-
-
-    @Select("select id\n" +
-            "from simulation_manual_project\n" +
-            "where is_deleted = '0'\n" +
-            "  and now_run_state = #{state}")
-    List<String> selectIdByState(@Param("state") String state);
-
-    @Select("select create_user_id\n" +
-            "from simulation_manual_project\n" +
-            "where id = #{id}")
-    void selectCreateUserById(@Param("id") String id);
 }

+ 1 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/po/ProjectPO.java

@@ -13,5 +13,6 @@ public class ProjectPO {
 
     private String id;
     private String scenePackageId;
+    private String createUserId;
 
 }

+ 1 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/po/TaskPO.java

@@ -22,4 +22,5 @@ public class TaskPO extends CommonPO {
     private String scoreExplain;
     private Long maxSimulationTime;
     private String lastTargetId;
+    private String createUserId;
 }

+ 14 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/po/UserPO.java

@@ -0,0 +1,14 @@
+package com.css.simulation.resource.scheduler.pojo.po;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class UserPO {
+    private String id; // id
+    private String roleCode; // 角色
+    private String useType; // 占用类型
+}

+ 45 - 10
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -6,6 +6,7 @@ import api.common.util.StringUtil;
 import api.common.util.TimeUtil;
 import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.service.TaskService;
 import io.kubernetes.client.openapi.ApiClient;
@@ -36,7 +37,7 @@ public class ProjectScheduler {
     @Autowired
     TaskMapper taskMapper;
     @Autowired
-    ManualProjectMapper projectMapper;
+    ManualProjectMapper manualProjectMapper;
     @Value("${scheduler.manual-project.job-yaml}")
     String jobYaml;
     @Value("${scheduler.score.hostname}")
@@ -51,9 +52,42 @@ public class ProjectScheduler {
     KafkaTemplate<String, String> kafkaTemplate;
 
 
+//    /**
+//     * 调度项目启动
+//     * @throws IOException 超时时间
+//     */
+//    @Scheduled(fixedDelay = 60 * 1000)
+//    public void dispatchProject() throws IOException {
+//        //1 等待执行的项目
+//
+//
+//
+//        long timeout = 2 * 60 * 1000L;
+//
+//        SshClient client = SshUtil.getClient();
+//        ClientSession session = SshUtil.getSession(client, hostname, username, password);
+//        List<TaskPO> executingTaskList = taskMapper.selectExecuting();
+//
+//        log.info("ProjectScheduler--taskTimeout 正在运行的任务有:" + executingTaskList);
+//        if (executingTaskList != null && executingTaskList.size() > 0) {
+//            for (TaskPO task : executingTaskList) {
+//                String taskId = task.getId();
+//                String projectId = task.getPId();
+//                long lastTickTime = Long.parseLong(Objects.requireNonNull(redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick")));
+//                if (TimeUtil.getNow() - lastTickTime > timeout) {
+//                    String podName = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod");
+//                    taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
+//                }
+//            }
+//        }
+//        session.close();
+//        client.stop();
+//    }
+
+
     /**
      * 处理 pod 超时
-     * 同时也可处理 pod 莫名关闭
+     * 同时也可处理 pod 莫名关闭,因为关闭之后也会超时
      *
      * @throws IOException 超时时间
      */
@@ -64,16 +98,16 @@ public class ProjectScheduler {
 
         SshClient client = SshUtil.getClient();
         ClientSession session = SshUtil.getSession(client, hostname, username, password);
-        List<TaskPO> executingTaskList = taskMapper.selectExecuting();
-
+        List<TaskPO> executingTaskList = taskMapper.selectByRunState(DictConstants.TASK_RUNNING);
         log.info("ProjectScheduler--taskTimeout 正在运行的任务有:" + executingTaskList);
         if (executingTaskList != null && executingTaskList.size() > 0) {
             for (TaskPO task : executingTaskList) {
                 String taskId = task.getId();
                 String projectId = task.getPId();
-                long lastTickTime = Long.parseLong(Objects.requireNonNull(redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick")));
+                String userId = task.getCreateUserId();
+                long lastTickTime = Long.parseLong(Objects.requireNonNull(redisTemplate.opsForValue().get(manualProjectTopic+ ":" + userId + ":" + projectId + ":task" + taskId + ":tick")));
                 if (TimeUtil.getNow() - lastTickTime > timeout) {
-                    String podName = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod");
+                    String podName = redisTemplate.opsForValue().get(manualProjectTopic+ ":" + userId + ":" + projectId + ":task:" + taskId + ":pod");
                     taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
                 }
             }
@@ -92,13 +126,14 @@ public class ProjectScheduler {
         ClientSession session = SshUtil.getSession(client, hostname, username, password);
 
         //1 查询出正在运行中的 project
-        List<String> projectIdList = projectMapper.selectIdByState("20");
+        List<ProjectPO> projectIdList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
         log.info("ProjectScheduler--projectCheck 运行中的项目有:" + projectIdList);
         //2 根据 projectId 获取 pod
-        projectIdList.forEach(projectId -> {
+        projectIdList.forEach(project -> {
+            String projectId = project.getId();
+            String userId = project.getCreateUserId();
             try {
-
-                String checkKey = manualProjectTopic + ":" + projectId + ":check";
+                String checkKey = manualProjectTopic+ ":" + userId + ":" + projectId + ":check";
                 String lastNowString = redisTemplate.opsForValue().get(checkKey);
                 String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
                 log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);

+ 92 - 99
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ManualProjectService.java

@@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -34,9 +35,6 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ManualProjectService {
 
-
-    private static final String USER_ID = "simulation-resource-scheduler";
-
     @Autowired
     private SqlSessionFactory sqlSessionFactory;
     @Autowired
@@ -56,70 +54,69 @@ public class ManualProjectService {
     @Value("${minio.bucket-name}")
     String bucketName;
 
-    // -------------------------------- Comment --------------------------------
+    @Autowired
+    ManualProjectMapper manualProjectMapper;
+    @Autowired
+    TaskMapper taskMapper;
+    @Autowired
+    IndexTemplateMapper indexTemplateMapper;
+    @Autowired
+    SceneMapper sceneMapper;
+    @Autowired
+    AlgorithmMapper algorithmMapper;
 
-    public void prepare(String manualProjectTopic, String projectId, String projectJson) {
+    // -------------------------------- Comment --------------------------------
 
+    @Transactional
+    public void prepare(String manualProjectTopic, String userId, String projectId, String projectJson) {
         //1 redis 设置项目已完成任务为 0
         Set<String> oldKeys = stringRedisTemplate.keys(manualProjectTopic + ":" + projectId + "*");
-        assert oldKeys != null;
-        stringRedisTemplate.delete(oldKeys);
-        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":completed", "0");
-        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":start", projectJson);
-
-        try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
-            ManualProjectMapper manualProjectMapper = sqlSession.getMapper(ManualProjectMapper.class);
-            TaskMapper taskMapper = sqlSession.getMapper(TaskMapper.class);
-            manualProjectMapper.resetProjectState(projectId, DictConstants.PROJECT_RUNNING);   // 修改该 project 的状态为执行中,同时将已完成任务重置为 0 。
-            taskMapper.deleteByProject(projectId); // 将该 project 下所有任务删除。
-            sqlSession.commit();
+        if (CollectionUtil.isNotEmpty(oldKeys)) {
+            stringRedisTemplate.delete(oldKeys);
         }
+        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":completed", "0");
+        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":start", projectJson);
+        manualProjectMapper.updateInit(projectId, DictConstants.PROJECT_RUNNING);   // 修改该 project 的状态为执行中,同时将已完成任务重置为 0 。
+        taskMapper.deleteByProject(projectId); // 将该 project 下所有任务删除。
     }
 
+    @SneakyThrows
+    @Transactional
+    public List<ScenePO> handlePackage(String manualProjectTopic, String userId, String projectId, String packageId) {
 
-    public List<ScenePO> handlePackage(String manualProjectTopic, String projectId, String packageId) {
-
-        try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
-            IndexTemplateMapper indexTemplateMapper = sqlSession.getMapper(IndexTemplateMapper.class);
-            SceneMapper sceneMapper = sqlSession.getMapper(SceneMapper.class);
-            //1 查询该场景包的所有指标列表存入 redis
-            List<IndexTemplatePO> allIndexList = indexTemplateMapper.selectByPackageIdIncludeDeleted(packageId);
-            stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + packageId + ":all", JsonUtil.listToJson(allIndexList));
-
-            //2 查询场景包叶子指标
-            List<IndexTemplatePO> leafIndexList = allIndexList.stream().filter(index -> StringUtil.isNotEmpty(index.getRuleId())).collect(Collectors.toList());
-            stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + packageId + ":leaf", JsonUtil.listToJson(leafIndexList));
-            log.info("ManualProjectService--handlePackage 项目 " + projectId + " 的叶子指标为:" + leafIndexList);
-            List<ScenePO> sceneList = new ArrayList<>();
-            leafIndexList.forEach(leafIndex -> {
-                String naturalIds = leafIndex.getSceneNaturalIds();
-                String standardIds = leafIndex.getSceneStatueIds();
-                String accidentIds = leafIndex.getSceneTrafficIds();
-                if (StringUtil.isNotEmpty(naturalIds)) {
-                    List<String> naturalIdList = new ArrayList<>(Arrays.asList(naturalIds.split(",")));
-                    sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
-                }
-                if (StringUtil.isNotEmpty(standardIds)) {
-                    List<String> standardIdList = new ArrayList<>(Arrays.asList(standardIds.split(",")));
-                    sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
-                }
-                if (StringUtil.isNotEmpty(accidentIds)) {
-                    List<String> accidentIdList = new ArrayList<>(Arrays.asList(accidentIds.split(",")));
-                    sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
-                }
-            });
-            sqlSession.commit();
-            return sceneList;
-
-        } catch (Exception e) {
-            throw new RuntimeException("ManualProjectService--handlePackage 场景包处理出错:" + e.getMessage());
-        }
-
+        //1 查询该场景包的所有指标列表存入 redis
+        List<IndexTemplatePO> allIndexList = indexTemplateMapper.selectByPackageIdIncludeDeleted(packageId);
+        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":package:" + packageId + ":all", JsonUtil.listToJson(allIndexList));
 
+        //2 查询场景包叶子指标
+        List<IndexTemplatePO> leafIndexList = allIndexList.stream().filter(index -> StringUtil.isNotEmpty(index.getRuleId())).collect(Collectors.toList());
+        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":package:" + packageId + ":leaf", JsonUtil.listToJson(leafIndexList));
+        log.info("ManualProjectService--handlePackage 项目 " + projectId + " 的叶子指标为:" + leafIndexList);
+        List<ScenePO> sceneList = new ArrayList<>();
+        leafIndexList.forEach(leafIndex -> {
+            String naturalIds = leafIndex.getSceneNaturalIds();
+            String standardIds = leafIndex.getSceneStatueIds();
+            String accidentIds = leafIndex.getSceneTrafficIds();
+            if (StringUtil.isNotEmpty(naturalIds)) {
+                List<String> naturalIdList = new ArrayList<>(Arrays.asList(naturalIds.split(",")));
+                sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
+            }
+            if (StringUtil.isNotEmpty(standardIds)) {
+                List<String> standardIdList = new ArrayList<>(Arrays.asList(standardIds.split(",")));
+                sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
+            }
+            if (StringUtil.isNotEmpty(accidentIds)) {
+                List<String> accidentIdList = new ArrayList<>(Arrays.asList(accidentIds.split(",")));
+                sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
+            }
+        });
+        manualProjectMapper.updateTaskNumber(projectId, sceneList.size());
+        log.info("ManualProjectService--handlePackage 项目" + projectId + " 共有 " + sceneList.size() + " 个任务,对应 " + sceneList.size() + " 个场景!");
+        return sceneList;
     }
 
 
-    public void sendTaskMessage(String manualProjectTopic, String projectId, int maxSimulationTime, Set<ScenePO> scenePOSet, VehiclePO vehiclePO, List<CameraPO> cameraPOList, List<OgtPO> ogtPOList) {
+    public void sendTaskMessage(String manualProjectTopic, String userId, String projectId, int maxSimulationTime, Set<ScenePO> scenePOSet, VehiclePO vehiclePO, List<CameraPO> cameraPOList, List<OgtPO> ogtPOList) {
 
         try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
             IndexMapper indexMapper = sqlSession.getMapper(IndexMapper.class);
@@ -133,7 +130,7 @@ public class ManualProjectService {
                 lastTargetIdList.forEach(lastTargetId -> {
                     String taskId = StringUtil.getRandomUUID();
                     // 设置任务重试次数为 0,方便任务进行最大3次的重试。
-                    stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry", "0");
+                    stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":retry", "0");
                     // 保存任务信息
                     TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
                             .id(taskId)
@@ -146,9 +143,9 @@ public class ManualProjectService {
                             .runResultFilePath(resultPathMinio + projectId + "/" + taskId)
                             .build();
                     taskPO.setCreateTime(TimeUtil.getNowForMysql());
-                    taskPO.setCreateUserId(USER_ID);
+                    taskPO.setCreateUserId(userId);
                     taskPO.setModifyTime(TimeUtil.getNowForMysql());
-                    taskPO.setModifyUserId(USER_ID);
+                    taskPO.setModifyUserId(userId);
                     taskPO.setModifyTime(TimeUtil.getNowForMysql());
                     taskPO.setIsDeleted("0");
                     taskMapper.insert(taskPO);
@@ -222,7 +219,7 @@ public class ManualProjectService {
                         log.error("------- 发送消息失败:" + failure.getMessage());
                     });
                     messageNumber[0] = messageNumber[0] + 1;
-                    stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":message", taskJson);
+                    stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":message", taskJson);
                 });
             });
             log.info("------- ManualProjectConsumer 共发送了" + messageNumber[0] + " 条消息!");
@@ -236,27 +233,25 @@ public class ManualProjectService {
 
 
     @SneakyThrows
-    public String handleAlgorithm(String manualProjectTopic, String projectId, String algorithmId) {
+    @Transactional
+    public String handleAlgorithm(String projectId, String algorithmId) {
         String dockerImage;
-        try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
-            AlgorithmMapper algorithmMapper = sqlSession.getMapper(AlgorithmMapper.class);
-            TaskMapper taskMapper = sqlSession.getMapper(TaskMapper.class);
-            //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
-            AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
-            String algorithmTarLinuxTempPath = linuxTempPath + "algorithm/" + algorithmId + ".tar";
-            dockerImage = "algorithm_" + algorithmId + ":latest";
-            if (algorithmPO == null) {
-                // 访问索为远程接口
-                String tokenUri = "http://open-api.zoogooy.com/cgi-bin/token/token?grant_type=client_credential";
-                String appid = "3e64be4a29e5478f9717d53c11ab26ad";
-                String secret = "f183079f97ac9ed81a864619a83fc17a";
-                String tokenUrl = tokenUri + "&appid=" + appid + "&secret=" + secret;
-                String result = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
-                ObjectMapper objectMapper = new ObjectMapper();
-                JsonNode jsonNode = objectMapper.readTree(result);
-                JsonNode dataNode = jsonNode.path("data");
+        //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
+        AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
+        String algorithmTarLinuxTempPath = linuxTempPath + "algorithm/" + algorithmId + ".tar";
+        dockerImage = "algorithm_" + algorithmId + ":latest";
+        if (algorithmPO == null) {
+            // 访问索为远程接口
+            String tokenUri = "http://open-api.zoogooy.com/cgi-bin/token/token?grant_type=client_credential";
+            String appid = "3e64be4a29e5478f9717d53c11ab26ad";
+            String secret = "f183079f97ac9ed81a864619a83fc17a";
+            String tokenUrl = tokenUri + "&appid=" + appid + "&secret=" + secret;
+            String result = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
+            ObjectMapper objectMapper = new ObjectMapper();
+            JsonNode jsonNode = objectMapper.readTree(result);
+            JsonNode dataNode = jsonNode.path("data");
 
-                String token = dataNode.path("access_token").asText();
+            String token = dataNode.path("access_token").asText();
 
             /*
             {
@@ -287,31 +282,29 @@ public class ManualProjectService {
               "nowTime" : "2022-04-22 10:14:40"
             }
              */
-                String downloadUrl = "http://open-api.zoogooy.com/cgi-bin/api/icv-algorithm-agg/simulation/download"
-                        + "?access_token=" + token
-                        + "&id=" + algorithmId;
-                String tempDownloadUrl = HttpUtil.get(closeableHttpClient, requestConfig, downloadUrl);
-                InputStream inputStream = HttpUtil.getInputStream(closeableHttpClient, requestConfig, tempDownloadUrl);
-                FileUtil.writeInputStreamToLocalFile(inputStream, algorithmTarLinuxTempPath);
+            String downloadUrl = "http://open-api.zoogooy.com/cgi-bin/api/icv-algorithm-agg/simulation/download"
+                    + "?access_token=" + token
+                    + "&id=" + algorithmId;
+            String tempDownloadUrl = HttpUtil.get(closeableHttpClient, requestConfig, downloadUrl);
+            InputStream inputStream = HttpUtil.getInputStream(closeableHttpClient, requestConfig, tempDownloadUrl);
+            FileUtil.writeInputStreamToLocalFile(inputStream, algorithmTarLinuxTempPath);
+            LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
+        } else {
+            String minioPath = algorithmPO.getMinioPath();
+            if ("0".equals(algorithmPO.getDockerImport()) || algorithmPO.getDockerImport() == null) {
+                // 下载算法文件到本地( 2 到仓库服务器)
+                MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);
+                //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
                 LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
+                algorithmMapper.updateDockerImportAndDockerImageById("1", dockerImage, algorithmId);
+            } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
+                dockerImage = algorithmPO.getDockerImage();
             } else {
-                String minioPath = algorithmPO.getMinioPath();
-                if ("0".equals(algorithmPO.getDockerImport()) || algorithmPO.getDockerImport() == null) {
-                    // 下载算法文件到本地( 2 到仓库服务器)
-                    MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);
-                    //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
-                    LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
-                    algorithmMapper.updateDockerImportAndDockerImageById("1", dockerImage, algorithmId);
-                } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
-                    dockerImage = algorithmPO.getDockerImage();
-                } else {
-                    throw new RuntimeException("算法 " + algorithmId + " 的 mysql 数据有误!");
-                }
+                throw new RuntimeException("算法 " + algorithmId + " 的 mysql 数据有误!");
             }
-            sqlSession.commit();
         }
-        return dockerImage;
-
 
+        log.info("ManualProjectService--handleAlgorithm 项目 " + projectId + " 使用的算法镜像为:" + dockerImage);
+        return dockerImage;
     }
 }

+ 22 - 21
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -38,9 +38,6 @@ import java.util.stream.Collectors;
 @Slf4j
 public class TaskService {
 
-    private final String USER_ID = "simulation-resource-scheduler";
-
-
     @Autowired
     CloseableHttpClient closeableHttpClient;
     @Autowired
@@ -95,12 +92,14 @@ public class TaskService {
 
     @SneakyThrows
     public void taskState(String taskId, String state, String podName) {
-        String projectId = taskMapper.selectProjectIdById(taskId);
+        TaskPO taskPO = taskMapper.selectById(taskId);
+        String projectId = taskPO.getPId();
+        String userId = taskPO.getCreateUserId();
         SshClient client = SshUtil.getClient();
         ClientSession session = SshUtil.getSession(client, hostname, username, password);
         if ("Running".equals(state)) {
             // 将运行中的任务的 pod 名称放入 redis
-            stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod", podName);
+            stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":pod", podName);
             log.info("TaskService--state 修改任务 " + taskId + "的状态为 Running,pod 名称为:" + podName);
             taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
             return;
@@ -108,7 +107,7 @@ public class TaskService {
             String podDeleteCommand = "kubectl delete pod " + podName;
             log.info("TaskService--state 修改任务 " + taskId + "的状态为:" + state + ",pod 名称为:" + podName + ",并执行删除 pod 命令:" + podDeleteCommand);
             if ("Aborted".equals(state)) {
-                if (retry(projectId, taskId)) {
+                if (retry(userId, projectId, taskId)) {
                     taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
                     return;
                 }
@@ -135,7 +134,7 @@ public class TaskService {
                 }
                 taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), targetEvaluate);
             } else if ("Terminated".equals(state)) {
-                if (retry(projectId, taskId)) {
+                if (retry(userId, projectId, taskId)) {
                     taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
                     return;
                 }
@@ -143,7 +142,7 @@ public class TaskService {
             } else if ("PendingAnalysis".equals(state)) {
                 taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
             } else {
-                if (retry(projectId, taskId)) {
+                if (retry(userId, projectId, taskId)) {
                     taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
                     return;
                 }
@@ -169,8 +168,8 @@ public class TaskService {
         indexMapper.deleteFirstByProjectId(projectId);
         indexMapper.deleteLastByProjectId(projectId);
         //1 查询场景包对应指标
-        List<IndexTemplatePO> allIndexList = JsonUtil.jsonToList(stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + packageId + ":all"), IndexTemplatePO.class);
-        List<IndexTemplatePO> leafIndexList = JsonUtil.jsonToList(stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + packageId + ":leaf"), IndexTemplatePO.class);
+        List<IndexTemplatePO> allIndexList = JsonUtil.jsonToList(stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":package:" + packageId + ":all"), IndexTemplatePO.class);
+        List<IndexTemplatePO> leafIndexList = JsonUtil.jsonToList(stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":package:" + packageId + ":leaf"), IndexTemplatePO.class);
         int maxLevel = 1; // 用于计算指标得分
         List<LeafIndexPO> leafTaskIndexList = new ArrayList<>();
         log.info("TaskService--state 共有 " + leafIndexList.size() + "个叶子节点!");
@@ -237,7 +236,7 @@ public class TaskService {
                     taskOfLeaf.setScore(score.getUnit_scene_score());
                     taskOfLeaf.setTargetEvaluate(score.getEvaluate_item());
                     taskOfLeaf.setScoreExplain(score.getScore_description());
-                    taskOfLeaf.setModifyUserId(USER_ID);
+                    taskOfLeaf.setModifyUserId(userId);
                     taskOfLeaf.setModifyTime(TimeUtil.getNowForMysql());
                     scoreExplain.set(score.getScore_description());
 
@@ -281,9 +280,9 @@ public class TaskService {
                     .scoreExplain(scoreExplain.get())
                     .packageLevel(packageLevel)
                     .build();
-            leafTaskIndex.setCreateUserId(USER_ID);
+            leafTaskIndex.setCreateUserId(userId);
             leafTaskIndex.setCreateTime(TimeUtil.getNowForMysql());
-            leafTaskIndex.setModifyUserId(USER_ID);
+            leafTaskIndex.setModifyUserId(userId);
             leafTaskIndex.setModifyTime(TimeUtil.getNowForMysql());
             leafTaskIndex.setIsDeleted("0");
 
@@ -329,17 +328,17 @@ public class TaskService {
     }
 
 
-    public boolean retry(String projectId, String taskId) {
+    public boolean retry(String userId, String projectId, String taskId) {
         //1 首先查看任务是否重试过 3 次
-        int retry = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry")));
+        int retry = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":retry")));
         //2 如果重试次数没有超过 3 次,则重试
         if (retry > 3) {
             return false;
         }
-        String taskJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":message");
+        String taskJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":message");
         retry++;
         log.info("TaskService--retry 重试项目 " + projectId + " 的任务 " + taskId + ",重试次数为:" + retry + ",重新发送的消息为:" + taskJson);
-        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry", retry + "");
+        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":retry", retry + "");
         kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
             // 消息发送到的topic
             assert success != null;
@@ -368,9 +367,9 @@ public class TaskService {
         while (leafTaskIndexIterator.hasNext()) {
             LeafIndexPO leafTaskIndex = leafTaskIndexIterator.next();
             if (leafTaskIndex.getPackageLevel() == 1) {
-                leafTaskIndex.setCreateUserId(USER_ID);
+                leafTaskIndex.setCreateUserId(leafTaskIndex.getCreateUserId());
                 leafTaskIndex.setCreateTime(TimeUtil.getNowForMysql());
-                leafTaskIndex.setModifyUserId(USER_ID);
+                leafTaskIndex.setModifyUserId(leafTaskIndex.getModifyUserId());
                 leafTaskIndex.setModifyTime(TimeUtil.getNowForMysql());
                 leafTaskIndex.setIsDeleted("0");
                 indexMapper.insertFirstIndex(leafTaskIndex);
@@ -444,9 +443,11 @@ public class TaskService {
 
     public void taskTick(String taskId) {
         log.info("TaskService--taskTick 任务 " + taskId + "心跳!");
-        String projectId = taskMapper.selectProjectIdById(taskId);
+        TaskPO taskPO = taskMapper.selectById(taskId);
+        String projectId = taskPO.getPId();
+        String userId = taskPO.getCreateUserId();
         // 刷新 redis 心跳时间
-        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick", TimeUtil.getNowString());
+        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":tick", TimeUtil.getNowString());
     }