martin vor 3 Jahren
Ursprung
Commit
4a914fa560

+ 29 - 29
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -1,29 +1,29 @@
-package com.css.simulation.resource.scheduler.manager;
-
-import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
-import org.apache.ibatis.session.ExecutorType;
-import org.apache.ibatis.session.SqlSession;
-import org.apache.ibatis.session.SqlSessionFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-
-@Component
-public class TaskManager {
-
-    @Autowired
-    private SqlSessionFactory sqlSessionFactory;
-
-    public void batchUpdateByScoreResult(List<TaskPO> taskList) {
-
-        try(SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH,false)){
-            TaskMapper taskMapper = sqlSession.getMapper(TaskMapper.class);
-            for (TaskPO taskPO : taskList) {
-                taskMapper.updateByScoreResult(taskPO);
-            }
-            sqlSession.commit();
-        }
-    }
-}
+//package com.css.simulation.resource.scheduler.manager;
+//
+//import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+//import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
+//import org.apache.ibatis.session.ExecutorType;
+//import org.apache.ibatis.session.SqlSession;
+//import org.apache.ibatis.session.SqlSessionFactory;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.stereotype.Component;
+//
+//import java.util.List;
+//
+//@Component
+//public class TaskManager {
+//
+//    @Autowired
+//    private SqlSessionFactory sqlSessionFactory;
+//
+//    public void batchUpdateByScoreResult(List<TaskPO> taskList) {
+//
+//        try(SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH,false)){
+//            TaskMapper taskMapper = sqlSession.getMapper(TaskMapper.class);
+//            for (TaskPO taskPO : taskList) {
+//                taskMapper.updateByScoreResult(taskPO);
+//            }
+//            sqlSession.commit();
+//        }
+//    }
+//}

+ 12 - 14
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskMapper.java

@@ -41,24 +41,18 @@ public interface TaskMapper {
     @Update("update simulation_manual_project_task\n" +
             "set run_state = #{runState},run_start_time = #{runStartTime}\n" +
             "where id = #{id}")
-    void updateStateWithStartTime(@Param("id") String id, @Param("runState") String runState,@Param("runStartTime") Timestamp runStartTime);
+    void updateStateWithStartTime(@Param("id") String id, @Param("runState") String runState, @Param("runStartTime") Timestamp runStartTime);
 
     @Update("update simulation_manual_project_task\n" +
             "set run_state = #{runState},run_end_time = #{runStopTime},run_result='Success'\n" +
             "where id = #{id}")
-    void updateSuccessStateWithStopTime(@Param("id") String id, @Param("runState") String runState,@Param("runStopTime") Timestamp runStopTime);
+    void updateSuccessStateWithStopTime(@Param("id") String id, @Param("runState") String runState, @Param("runStopTime") Timestamp runStopTime);
 
     @Update("update simulation_manual_project_task\n" +
             "set run_state = #{runState},run_end_time = #{runStopTime},run_result='Failed'\n" +
             "where id = #{id}" +
             " and score is null")
-    void updateFailStateWithStopTime(@Param("id") String id, @Param("runState") String runState,@Param("runStopTime") Timestamp runStopTime);
-
-    @Update("update simulation_manual_project_task\n" +
-            "set run_state = #{runState}\n" +
-            "where id = #{id}")
-    void updateState(@Param("id") String id, @Param("runState") String runState);
-
+    void updateFailStateWithStopTime(@Param("id") String id, @Param("runState") String runState, @Param("runStopTime") Timestamp runStopTime);
 
     @Insert("insert into simulation_manual_project_task(id, p_id, scene_id, scene_name, scene_type, last_targer_id,\n" +
             "                                           run_state, run_result_file_path, create_time, create_user_id, modify_time,\n" +
@@ -73,13 +67,17 @@ public interface TaskMapper {
     String selectStateById(@Param("id") String id);
 
     @Update("update simulation_manual_project_task\n" +
-            "set return_scene_id = #{task.returnSceneId},\n" +
+            "set run_state = #{runState}," +
+            "    run_end_time = #{runStopTime}," +
+            "    run_result='Success'" +
+            "    return_scene_id = #{task.returnSceneId},\n" +
             "    score           = #{task.score},\n" +
             "    target_evaluate = #{task.targetEvaluate},\n" +
             "    score_explain   = #{task.scoreExplain},\n" +
             "    modify_user_id  = #{task.modifyUserId},\n" +
-            "    modify_time     = #{task.modifyTime}")
-    void updateByScoreResult(@Param("task") TaskPO task);
+            "    modify_time     = #{task.modifyTime}" +
+            "where id = {task.id}")
+    void updateSuccessStateAndScoreResultWithStopTime(@Param("task") TaskPO task, @Param("runState") String runState, @Param("runStopTime") Timestamp runStopTime);
 
     @ResultMap("task")
     @Select("select smpt.id, smpt.p_id, smp.max_simulation_time\n" +
@@ -93,7 +91,7 @@ public interface TaskMapper {
     @Select("select id\n" +
             "from simulation_manual_project_task\n" +
             "where p_id = #{projectId} and scene_id = #{sceneId}")
-    List<String> selectIdByProjectIdAndSceneId(@Param("projectId")String projectId,@Param("sceneId") String sceneId);
+    List<String> selectIdByProjectIdAndSceneId(@Param("projectId") String projectId, @Param("sceneId") String sceneId);
 
     @Select("select last_targer_id\n" +
             "from simulation_manual_project_task\n" +
@@ -108,6 +106,6 @@ public interface TaskMapper {
     @Update("update simulation_manual_project_task\n" +
             "set run_state = #{runState}\n" +
             "where id = #{taskId}")
-    void updateStateById(@Param("runState") String runState,@Param("taskId") String taskId);
+    void updateStateById(@Param("runState") String runState, @Param("taskId") String taskId);
 
 }

+ 1 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -56,12 +56,11 @@ public class TickScheduler {
 //                log.info("------- TickScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
                 if (difference > maxSimulationTime) {
 //                    log.info("------- TickScheduler 任务" + taskId + "已超时,状态修改为:" + DictConstants.TASK_ABORTED);
-                    taskMapper.updateState(taskId, DictConstants.TASK_ABORTED);
+                    taskMapper.updateFailStateWithStopTime(taskId, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql());
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e.getMessage());
             }
-
         });
 
     }

+ 91 - 82
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -3,7 +3,6 @@ package com.css.simulation.resource.scheduler.service;
 import api.common.pojo.constants.DictConstants;
 import api.common.util.*;
 import com.css.simulation.resource.scheduler.manager.TaskIndexManager;
-import com.css.simulation.resource.scheduler.manager.TaskManager;
 import com.css.simulation.resource.scheduler.mapper.IndexTemplateMapper;
 import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskIndexMapper;
@@ -15,8 +14,6 @@ import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.pojo.to.ScoreTO;
 import com.css.simulation.resource.scheduler.util.MinioUtil;
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -53,8 +50,8 @@ public class TaskService {
     StringRedisTemplate redisTemplate;
     @Autowired
     ProjectMapper projectMapper;
-    @Autowired
-    TaskManager taskManager;
+//    @Autowired
+//    TaskManager taskManager;
     @Autowired
     TaskMapper taskMapper;
     @Autowired
@@ -115,7 +112,7 @@ public class TaskService {
             LinuxUtil.execute("kubectl delete pod " + podName);
             taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
         } else {
-            taskMapper.updateState(taskId, state);
+            taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
         }
         ProjectPO projectPO = projectMapper.selectById(taskId);
         if (projectPO == null) {
@@ -166,79 +163,91 @@ public class TaskService {
             List<TaskPO> taskListOfLeafIndex = taskList.stream()
                     .filter(task -> indexId.equals(task.getLastTargetId()) && DictConstants.TASK_ANALYSIS.equals(task.getRunState()))
                     .collect(Collectors.toList());
-            int resultNumberOfCurrentIndex = taskListOfLeafIndex.size();
-            log.info("------- /state 叶子节点 " + indexId + " 包括 " + resultNumberOfCurrentIndex + " 个任务!");
-            log.info("------- /state 计算叶子节点 " + indexId + " 的得分!");
-            double sum = taskListOfLeafIndex.stream()
-                    .mapToDouble(task2 -> {
-                        String runState = task2.getRunState();
-                        if (!DictConstants.TASK_ANALYSIS.equals(runState)) {
-                            return 0.0;
-                        } else {
-                            String task2Id = task2.getId();
-                            taskMapper.updateState(task2Id, DictConstants.TASK_ANALYSING);
-                            // 计算每个任务的得分
 
-                            String runResultMinio = task2.getRunResultFilePath() + "/Ego.csv";
-                            String runResultLinux = linuxTempPath + runResultMinio;
+            log.info("------- /state 叶子节点 " + indexId + " 包括 " + taskListOfLeafIndex.size() + " 个成功运行结束任务!");
+            log.info("------- /state 计算叶子节点 " + indexId + " 的得分:" + taskListOfLeafIndex);
+            taskListOfLeafIndex.forEach(task2 -> {
+                String runState = task2.getRunState();
+                if (DictConstants.TASK_ANALYSIS.equals(runState)) {
+                    String task2Id = task2.getId();
+                    taskMapper.updateSuccessStateWithStopTime(task2Id, DictConstants.TASK_ANALYSING, TimeUtil.getNowForMysql());
+                    // 计算每个任务的得分
+
+                    String runResultMinio = task2.getRunResultFilePath() + "/Ego.csv";
+                    String runResultLinux = linuxTempPath + runResultMinio;
 
 //                        python3 /home/ubuntu/test/Evaluate/main.py /home/ubuntu/test/test_data.csv 4 AEB_1-2
 //                        String command = "python3 " + pyPath + " " + runResultLinux + " " + task2.getSceneType();  // 默认使用场景名称找打分脚本
-                            String command = "python3 " + pyPath + "main.py " + runResultLinux + " " + task2.getSceneType() + " " + ruleName; // 指定打分脚本
-                            String scoreResult;
-                            try {
-                                try {
-                                    log.info("------- /state 下载 minio 上的结果文件 " + runResultMinio + " 到本地:" + runResultLinux);
-                                    MinioUtil.downloadToFile(minioClient, bucketName, runResultMinio, runResultLinux);  // 也可改成下载到指定ip的服务器上,需要保证和打分脚本在一台机器上。
-                                } catch (Exception e) {
-                                    throw new RuntimeException("------- /state 下载 minio 上的结果文件出错:" + e.getMessage());
-                                }
-                                try {
-                                    log.info("------- /state 开始执行打分命令:" + command);
-                                    scoreResult = SshUtil.execute(sessionScore, command);
-                                    log.info("------- /state 打分结束,结果为:" + scoreResult);
-                                } catch (IOException e) {
-                                    throw new RuntimeException("------- /state 任务 " + task2Id + " 打分出错,命令为:" + command + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
-                                }
-                            } catch (Exception e) {
-                                taskMapper.updateState(task2Id, DictConstants.TASK_ABORTED);
-                                throw new RuntimeException(e.getMessage());
-                            }
-                            ScoreTO score = null;
-                            try {
-                                String replace = StringUtil.replace(scoreResult, "'", "\"");
-                                score = JsonUtil.jsonToBean(replace, ScoreTO.class);
-                            } catch (JsonProcessingException e) {
-                                e.printStackTrace();
-                            }
-                            assert score != null;
-                            task2.setReturnSceneId(score.getUnit_scene_ID());
-                            task2.setScore(score.getUnit_scene_score());
-                            task2.setTargetEvaluate(score.getEvaluate_item());
-                            task2.setScoreExplain(score.getScore_description());
-                            task2.setModifyUserId(USER_ID);
-                            task2.setModifyTime(TimeUtil.getNowForMysql());
-                            scoreExplain.set(score.getScore_description());
+                    String command = "python3 " + pyPath + "main.py " + runResultLinux + " " + task2.getSceneType() + " " + ruleName; // 指定打分脚本
+                    String scoreResult;
+                    try {
+                        try {
+                            log.info("------- /state 下载 minio 上的结果文件 " + runResultMinio + " 到本地:" + runResultLinux);
+                            MinioUtil.downloadToFile(minioClient, bucketName, runResultMinio, runResultLinux);  // 也可改成下载到指定ip的服务器上,需要保证和打分脚本在一台机器上。
+                        } catch (Exception e) {
+                            throw new RuntimeException("------- /state 下载 minio 上的结果文件出错:" + e.getMessage());
+                        }
+                        try {
+                            log.info("------- /state 开始执行打分命令:" + command);
+                            scoreResult = SshUtil.execute(sessionScore, command);
+                            log.info("------- /state 打分结束,结果为:" + scoreResult);
+                        } catch (IOException e) {
+                            throw new RuntimeException("------- /state 任务 " + task2Id + " 打分出错,命令为:" + command + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
+                        }
+                    } catch (Exception e) {
+                        task2.setRunState(DictConstants.TASK_ABORTED);
+                        taskMapper.updateFailStateWithStopTime(task2Id, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql());
+                        throw new RuntimeException(e.getMessage());
+                    }
+                    ScoreTO score = null;
+                    try {
+                        String replace = StringUtil.replace(scoreResult, "'", "\"");
+                        score = JsonUtil.jsonToBean(replace, ScoreTO.class);
+                    } catch (JsonProcessingException e) {
+                        e.printStackTrace();
+                    }
+                    assert score != null;
+                    task2.setReturnSceneId(score.getUnit_scene_ID());
+                    task2.setScore(score.getUnit_scene_score());
+                    task2.setTargetEvaluate(score.getEvaluate_item());
+                    task2.setScoreExplain(score.getScore_description());
+                    task2.setModifyUserId(USER_ID);
+                    task2.setModifyTime(TimeUtil.getNowForMysql());
+                    scoreExplain.set(score.getScore_description());
 
-                            taskMapper.updateState(task2Id, DictConstants.TASK_COMPLETED);
+                    task2.setRunState(DictConstants.TASK_COMPLETED);
+                    taskMapper.updateSuccessStateAndScoreResultWithStopTime(
+                            task2,
+                            DictConstants.TASK_COMPLETED,
+                            TimeUtil.getNowForMysql()
+                    );
+                }
+            });
 
-                            return score.getUnit_scene_score();
-                        }
 
-                    }).sum();
             // 计算不合格的任务数(不到100分就是不合格,执行失败的不算)
-            long notStandardSceneNum = taskListOfLeafIndex.stream()
+            long notStandardSceneNumber = taskListOfLeafIndex.stream()
                     .filter(task -> task.getScore() < 100 && task.getScore() != 0.0 && DictConstants.TASK_COMPLETED.equals(task.getRunState()))
                     .count();
-            // 叶子指标的得分(叶子指标下所有场景得分的平均值,执行失败的不算)
-            double leafIndexScore = (int) (sum / resultNumberOfCurrentIndex * 100) / 100.0;
+
+            // 计算总分
+            double leafSum = taskListOfLeafIndex.stream()
+                    .filter(task -> DictConstants.TASK_COMPLETED.equals(task.getRunState()))
+                    .mapToDouble(TaskPO::getScore)
+                    .sum();
+            // 计算成功执行的个数
+            long resultNumberOfCurrentIndex = taskListOfLeafIndex.stream()
+                    .filter(task -> DictConstants.TASK_COMPLETED.equals(task.getRunState()))
+                    .count();
+            log.info("------- /state 项目 " + projectId + " 的叶子指标" + indexId + "下成功执行的场景数量为:" + resultNumberOfCurrentIndex);
+            double leafIndexScore = resultNumberOfCurrentIndex == 0 ? 0 : ((long) (leafSum / resultNumberOfCurrentIndex) * 100L) / 100.0;
             // -------------------------------- 保存叶子指标得分 --------------------------------
             indexTemplatePO.setTempScore(leafIndexScore);
             TaskIndexPO leafTaskIndex = TaskIndexPO.builder()
                     .id(StringUtil.getRandomUUID())
                     .pId(projectId)
                     .target(indexTemplatePO.getIndexId())
-                    .notStandardSceneNum((int) notStandardSceneNum)
+                    .notStandardSceneNum((int) notStandardSceneNumber)
                     .score(leafIndexScore)
                     .indexId(indexId)
                     .parentId(parentId)
@@ -254,8 +263,8 @@ public class TaskService {
             leafTaskIndexList.add(leafTaskIndex);
         }
         SshUtil.stop(clientScore, sessionScore);
-        // 保存任务分数
-        taskManager.batchUpdateByScoreResult(taskList);
+//        // 保存任务分数
+//        taskManager.batchUpdateByScoreResult(taskList);
         // 保存末级指标分数
         taskIndexManager.batchInsertLeafIndex(leafTaskIndexList);
         // 保存一级指标分数
@@ -263,23 +272,23 @@ public class TaskService {
         log.info("------- /state 根据每个指标的得分和权重算出各个一级指标的得分(即 project 对应的场景测试包下的一级指标)!");
         computeFirst(leafTaskIndexList, projectId);
 
-        // 调用 server 的接口,计算评价等级
-        String tokenUrl = tokenUri + "?grant_type=client_credentials"
-                + "&client_id=" + clientId
-                + "&client_secret=" + clientSecret;
-        log.info("------- /state 获取仿真云平台 token:" + tokenUrl);
-        String response = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
-        ObjectMapper objectMapper = new ObjectMapper();
-        JsonNode jsonNode = objectMapper.readTree(response);
-        String accessToken = jsonNode.path("access_token").asText();
-        log.info("------- /state 仿真云平台 token 为:" + accessToken);
-        Map<String, String> headers = new HashMap<>();
-        headers.put("Authorization", "Bearer " + accessToken);
-        Map<String, String> params = new HashMap<>();
-        params.put("id", projectId);
-        String post = HttpUtil.post(closeableHttpClient, requestConfig, evaluationLevelUri, headers, params);
-        log.info("------- /state 访问仿真云平台评价等级接口:" + evaluationLevelUri + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + post);
-        log.info("------- /state 项目 " + projectId + " 打分完成");
+//        // 调用 server 的接口,计算评价等级
+//        String tokenUrl = tokenUri + "?grant_type=client_credentials"
+//                + "&client_id=" + clientId
+//                + "&client_secret=" + clientSecret;
+//        log.info("------- /state 获取仿真云平台 token:" + tokenUrl);
+//        String response = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
+//        ObjectMapper objectMapper = new ObjectMapper();
+//        JsonNode jsonNode = objectMapper.readTree(response);
+//        String accessToken = jsonNode.path("access_token").asText();
+//        log.info("------- /state 仿真云平台 token 为:" + accessToken);
+//        Map<String, String> headers = new HashMap<>();
+//        headers.put("Authorization", "Bearer " + accessToken);
+//        Map<String, String> params = new HashMap<>();
+//        params.put("id", projectId);
+//        String post = HttpUtil.post(closeableHttpClient, requestConfig, evaluationLevelUri, headers, params);
+//        log.info("------- /state 访问仿真云平台评价等级接口:" + evaluationLevelUri + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + post);
+//        log.info("------- /state 项目 " + projectId + " 打分完成");
 
 
     }