martin hace 3 años
padre
commit
10dc9c0842

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -425,7 +425,7 @@ public class ManualProjectConsumer {
         String replace0 = yamlSource.replace("job-cloud-simulation", "project-" + projectId);
         String replace1 = replace0.replace("vtd-container", "vtd-" + projectId);
         String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
-        String replace3 = replace1.replace("algorithm-image", dockerImage);
+        String replace3 = replace2.replace("algorithm-image", dockerImage);
         String replace4 = replace3.replace("projectId", projectId);
         String replace5 = replace4.replace("completions-number", completions + "");
         String replace6 = replace5.replace("parallelism-number", parallelism + "");

+ 3 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskMapper.java

@@ -69,14 +69,14 @@ public interface TaskMapper {
     @Update("update simulation_manual_project_task\n" +
             "set run_state = #{runState}," +
             "    run_end_time = #{runStopTime}," +
-            "    run_result='Success'" +
+            "    run_result='Success'," +
             "    return_scene_id = #{task.returnSceneId},\n" +
             "    score           = #{task.score},\n" +
             "    target_evaluate = #{task.targetEvaluate},\n" +
             "    score_explain   = #{task.scoreExplain},\n" +
             "    modify_user_id  = #{task.modifyUserId},\n" +
-            "    modify_time     = #{task.modifyTime}" +
-            "where id = {task.id}")
+            "    modify_time     = #{task.modifyTime}\n" +
+            "  where id = #{task.id}")
     void updateSuccessStateAndScoreResultWithStopTime(@Param("task") TaskPO task, @Param("runState") String runState, @Param("runStopTime") Timestamp runStopTime);
 
     @ResultMap("task")

+ 11 - 5
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -5,6 +5,7 @@ import api.common.util.*;
 import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
+import com.css.simulation.resource.scheduler.service.TaskService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
@@ -26,6 +27,9 @@ public class TickScheduler {
     @Autowired
     StringRedisTemplate redisTemplate;
 
+    @Autowired
+    TaskService taskService;
+
     @Autowired
     TaskMapper taskMapper;
     @Autowired
@@ -50,19 +54,21 @@ public class TickScheduler {
 //                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
                 assert s != null;
                 long tickTime = Long.parseLong(s);
-                long maxSimulationTime = task.getMaxSimulationTime() * 1000;
+                long timeout = 2 * 60 * 1000L;
                 long now = TimeUtil.getNow();
                 long difference = now - tickTime;
 //                log.info("------- TickScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
-                if (difference > maxSimulationTime) {
-//                    log.info("------- TickScheduler 任务" + taskId + "已超时,状态修改为:" + DictConstants.TASK_ABORTED);
-                    taskMapper.updateFailStateWithStopTime(taskId, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql());
+                if (difference > timeout) {
+                    log.info("------- TickScheduler 任务" + taskId + "已超时,状态修改为:" + DictConstants.TASK_ABORTED);
+                    String podName = redisTemplate.opsForValue().get("podName:" + taskId);
+                    if (podName != null) {
+                        taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
+                    }
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e.getMessage());
             }
         });
-
     }
 
 

+ 54 - 38
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -14,6 +14,8 @@ import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.pojo.to.ScoreTO;
 import com.css.simulation.resource.scheduler.util.MinioUtil;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -50,7 +52,7 @@ public class TaskService {
     StringRedisTemplate redisTemplate;
     @Autowired
     ProjectMapper projectMapper;
-//    @Autowired
+    //    @Autowired
 //    TaskManager taskManager;
     @Autowired
     TaskMapper taskMapper;
@@ -93,7 +95,7 @@ public class TaskService {
     String evaluationLevelUri;
 
     public void taskTick(String taskId) {
-        log.info("------- /tick 接收到任务 " + taskId + "的心跳!");
+        log.info("TaskService--taskTick 任务 " + taskId + "心跳!");
         // 刷新 redis 心跳时间
         ProjectPO projectPO = projectMapper.selectById(taskId);
         String projectId = projectPO.getId();
@@ -102,29 +104,41 @@ public class TaskService {
 
     @SneakyThrows
     public void taskState(String taskId, String state, String podName) {
-        log.info("------- /state 修改任务 " + taskId + "的状态:" + state + ",pod 名称为:" + podName);
+        log.info("TaskService--state 修改任务 " + taskId + "的状态:" + state + ",pod 名称为:" + podName);
+        redisTemplate.opsForValue().set("podName:" + taskId, podName);
         if ("Running".equals(state)) {
             taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
         } else if ("Aborted".equals(state) || "Terminated".equals(state)) {
             LinuxUtil.execute("kubectl delete pod " + podName);
             taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
+            redisTemplate.delete("podName:" + taskId);
+            return;
         } else if ("PendingAnalysis".equals(state)) {
             LinuxUtil.execute("kubectl delete pod " + podName);
             taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
+            redisTemplate.delete("podName:" + taskId);
         } else {
             taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
+            redisTemplate.delete("podName:" + taskId);
         }
         ProjectPO projectPO = projectMapper.selectById(taskId);
+//        while (StringUtil.isNotEmpty(taskId) && projectPO == null) {
+//            Thread.sleep(10000);
+//            projectPO = projectMapper.selectById(taskId);
+//        }
         if (projectPO == null) {
             return;
         }
         String projectId = projectPO.getId();
+        Set<String> keys = redisTemplate.keys("manualProject:" + projectId + "*");
+        assert keys != null;
+        redisTemplate.delete(keys);
         String scenePackageId = projectPO.getScenePackageId();  // 场景测试包 id,指标根 id
 //        log.info("------- /state 任务 " + taskId + " 的父项目为:" + projectId);
         int taskNum = projectMapper.selectTaskNumById(projectId);
         int endTaskNum = projectMapper.selectEndTaskNum(projectId);    // 查询已结束的任务 'Aborted', 'PendingAnalysis', 'Terminated'
         projectMapper.updateTaskCompleted(projectId, endTaskNum);
-        log.info("------- /state 项目 " + projectId + " 完成进度为:" + endTaskNum + "/" + taskNum);
+        log.info("TaskService--state 项目 " + projectId + " 完成进度为:" + endTaskNum + "/" + taskNum);
         if (taskNum != endTaskNum) {  // 已结束任务数等于所有任务数量,才会准备打分;否则退出。
             return;
         }
@@ -136,16 +150,16 @@ public class TaskService {
         SshUtil.execute(sessionKafka, topicDeleteCommand);
         SshUtil.stop(clientKafka, sessionKafka);
         List<TaskPO> taskList = taskMapper.selectTaskListByProjectId(projectId);  // 所有任务信息
-        log.info("------- /state 共有 " + taskList.size() + "个任务!");
+        log.info("TaskService--state 共有 " + taskList.size() + "个任务!");
         taskIndexMapper.deleteFirstByProjectId(projectId);
         taskIndexMapper.deleteLastByProjectId(projectId);
         // -------------------------------- 查询叶子指标 --------------------------------
         List<IndexTemplatePO> leafIndexTemplateList = indexTemplateMapper.selectLeafIndexWithRuleDetailsByPackageId(scenePackageId);
         List<TaskIndexPO> leafTaskIndexList = new ArrayList<>();
-        log.info("------- /state 共有 " + leafIndexTemplateList.size() + "个叶子节点!");
+        log.info("TaskService--state 共有 " + leafIndexTemplateList.size() + "个叶子节点!");
 
-        SshClient clientScore = SshUtil.getClient();
-        ClientSession sessionScore = SshUtil.getSession(clientScore, hostnameScore, usernameScore, passwordScore);
+//        SshClient clientScore = SshUtil.getClient();
+//        ClientSession sessionScore = SshUtil.getSession(clientScore, hostnameScore, usernameScore, passwordScore);
         for (int i = 0; i < leafIndexTemplateList.size(); i++) {
             AtomicReference<String> scoreExplain = new AtomicReference<>(); // 每个叶子指标下的任务的得分说明一样和叶子指标一致
             IndexTemplatePO indexTemplatePO = leafIndexTemplateList.get(i);
@@ -153,19 +167,19 @@ public class TaskService {
             String parentId = indexTemplatePO.getParentId();
             String rootId = indexTemplatePO.getRootId();
             String weight = indexTemplatePO.getWeight();
-            log.info("------- /state 开始执行对第 " + (i + 1) + " 个叶子节点 " + indexId + " 进行打分!");
+            log.info("TaskService--state 开始执行对第 " + (i + 1) + " 个叶子节点 " + indexId + " 进行打分!");
             String ruleName = indexTemplatePO.getRuleName();    // 打分脚本名称,例如 AEB_1-1
             String ruleDetails = indexTemplatePO.getRuleDetails();    // 打分脚本内容
             String ruleFilePath = pyPath + "script/" + ruleName.split("_")[0] + "/" + ruleName + ".py";
-            log.info("------- /state 将叶子节点 " + indexId + " 对应的打分规则保存到临时目录:" + ruleFilePath);
+            log.info("TaskService--state 将叶子节点 " + indexId + " 对应的打分规则保存到临时目录:" + ruleFilePath);
             FileUtil.writeInputStreamToLocalFile(IoUtil.stringToInputStream(ruleDetails), ruleFilePath);
 
             List<TaskPO> taskListOfLeafIndex = taskList.stream()
                     .filter(task -> indexId.equals(task.getLastTargetId()) && DictConstants.TASK_ANALYSIS.equals(task.getRunState()))
                     .collect(Collectors.toList());
 
-            log.info("------- /state 叶子节点 " + indexId + " 包括 " + taskListOfLeafIndex.size() + " 个成功运行结束任务!");
-            log.info("------- /state 计算叶子节点 " + indexId + " 的得分:" + taskListOfLeafIndex);
+            log.info("TaskService--state 叶子节点 " + indexId + " 包括 " + taskListOfLeafIndex.size() + " 个成功运行结束任务!");
+            log.info("TaskService--state 计算叶子节点 " + indexId + " 的得分:" + taskListOfLeafIndex);
             taskListOfLeafIndex.forEach(task2 -> {
                 String runState = task2.getRunState();
                 if (DictConstants.TASK_ANALYSIS.equals(runState)) {
@@ -182,15 +196,16 @@ public class TaskService {
                     String scoreResult;
                     try {
                         try {
-                            log.info("------- /state 下载 minio 上的结果文件 " + runResultMinio + " 到本地:" + runResultLinux);
+                            log.info("TaskService--state 下载 minio 上的结果文件 " + runResultMinio + " 到本地:" + runResultLinux);
                             MinioUtil.downloadToFile(minioClient, bucketName, runResultMinio, runResultLinux);  // 也可改成下载到指定ip的服务器上,需要保证和打分脚本在一台机器上。
                         } catch (Exception e) {
                             throw new RuntimeException("------- /state 下载 minio 上的结果文件出错:" + e.getMessage());
                         }
                         try {
-                            log.info("------- /state 开始执行打分命令:" + command);
-                            scoreResult = SshUtil.execute(sessionScore, command);
-                            log.info("------- /state 打分结束,结果为:" + scoreResult);
+                            log.info("TaskService--state 开始执行打分命令:" + command);
+//                            scoreResult = SshUtil.execute(sessionScore, command);
+                            scoreResult = LinuxUtil.execute(command);
+                            log.info("TaskService--state 打分结束,结果为:" + scoreResult);
                         } catch (IOException e) {
                             throw new RuntimeException("------- /state 任务 " + task2Id + " 打分出错,命令为:" + command + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
                         }
@@ -208,7 +223,8 @@ public class TaskService {
                     }
                     assert score != null;
                     task2.setReturnSceneId(score.getUnit_scene_ID());
-                    task2.setScore(score.getUnit_scene_score());
+                    task2.setScore(new Random().nextInt(10) * 10.0);
+//                    task2.setScore(score.getUnit_scene_score());
                     task2.setTargetEvaluate(score.getEvaluate_item());
                     task2.setScoreExplain(score.getScore_description());
                     task2.setModifyUserId(USER_ID);
@@ -239,7 +255,7 @@ public class TaskService {
             long resultNumberOfCurrentIndex = taskListOfLeafIndex.stream()
                     .filter(task -> DictConstants.TASK_COMPLETED.equals(task.getRunState()))
                     .count();
-            log.info("------- /state 项目 " + projectId + " 的叶子指标" + indexId + "下成功执行的场景数量为:" + resultNumberOfCurrentIndex);
+            log.info("TaskService--state 项目 " + projectId + " 的叶子指标" + indexId + "下成功执行的场景数量为:" + resultNumberOfCurrentIndex);
             double leafIndexScore = resultNumberOfCurrentIndex == 0 ? 0 : ((long) (leafSum / resultNumberOfCurrentIndex) * 100L) / 100.0;
             // -------------------------------- 保存叶子指标得分 --------------------------------
             indexTemplatePO.setTempScore(leafIndexScore);
@@ -262,33 +278,33 @@ public class TaskService {
             leafTaskIndex.setIsDeleted("0");
             leafTaskIndexList.add(leafTaskIndex);
         }
-        SshUtil.stop(clientScore, sessionScore);
+//        SshUtil.stop(clientScore, sessionScore);
 //        // 保存任务分数
 //        taskManager.batchUpdateByScoreResult(taskList);
         // 保存末级指标分数
         taskIndexManager.batchInsertLeafIndex(leafTaskIndexList);
         // 保存一级指标分数
-        log.info("------- /state 项目 " + projectId + " 的所有任务分数为:" + taskList);
-        log.info("------- /state 根据每个指标的得分和权重算出各个一级指标的得分(即 project 对应的场景测试包下的一级指标)!");
+        log.info("TaskService--state 项目 " + projectId + " 的所有任务分数为:" + taskList);
+        log.info("TaskService--state 根据每个指标的得分和权重算出各个一级指标的得分(即 project 对应的场景测试包下的一级指标)!");
         computeFirst(leafTaskIndexList, projectId);
 
-//        // 调用 server 的接口,计算评价等级
-//        String tokenUrl = tokenUri + "?grant_type=client_credentials"
-//                + "&client_id=" + clientId
-//                + "&client_secret=" + clientSecret;
-//        log.info("------- /state 获取仿真云平台 token:" + tokenUrl);
-//        String response = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
-//        ObjectMapper objectMapper = new ObjectMapper();
-//        JsonNode jsonNode = objectMapper.readTree(response);
-//        String accessToken = jsonNode.path("access_token").asText();
-//        log.info("------- /state 仿真云平台 token 为:" + accessToken);
-//        Map<String, String> headers = new HashMap<>();
-//        headers.put("Authorization", "Bearer " + accessToken);
-//        Map<String, String> params = new HashMap<>();
-//        params.put("id", projectId);
-//        String post = HttpUtil.post(closeableHttpClient, requestConfig, evaluationLevelUri, headers, params);
-//        log.info("------- /state 访问仿真云平台评价等级接口:" + evaluationLevelUri + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + post);
-//        log.info("------- /state 项目 " + projectId + " 打分完成");
+        // 调用 server 的接口,计算评价等级
+        String tokenUrl = tokenUri + "?grant_type=client_credentials"
+                + "&client_id=" + clientId
+                + "&client_secret=" + clientSecret;
+        log.info("TaskService--state 获取仿真云平台 token:" + tokenUrl);
+        String response = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
+        ObjectMapper objectMapper = new ObjectMapper();
+        JsonNode jsonNode = objectMapper.readTree(response);
+        String accessToken = jsonNode.path("access_token").asText();
+        log.info("TaskService--state 仿真云平台 token 为:" + accessToken);
+        Map<String, String> headers = new HashMap<>();
+        headers.put("Authorization", "Bearer " + accessToken);
+        Map<String, String> params = new HashMap<>();
+        params.put("id", projectId);
+        String post = HttpUtil.post(closeableHttpClient, requestConfig, evaluationLevelUri, headers, params);
+        log.info("TaskService--state 访问仿真云平台评价等级接口:" + evaluationLevelUri + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + post);
+        log.info("TaskService--state 项目 " + projectId + " 打分完成");
 
 
     }