martin 3 жил өмнө
parent
commit
c6733507f0

+ 6 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/ProjectMapper.java

@@ -6,6 +6,7 @@ import org.apache.ibatis.annotations.*;
 import org.apache.ibatis.type.JdbcType;
 
 import java.sql.Timestamp;
+import java.util.List;
 
 @Mapper
 public interface ProjectMapper {
@@ -55,4 +56,9 @@ public interface ProjectMapper {
             "where run_state in ('Aborted', 'PendingAnalysis', 'Terminated')\n" +
             "  and p_id = #{projectId}")
     int selectEndTaskNum(@Param("projectId") String projectId);
+
+    @Select("select id\n" +
+            "from simulation_manual_project \n" +
+            "where now_run_state = #{state}")
+    List<String> selectIdByState(@Param("state") String state);
 }

+ 36 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -2,7 +2,10 @@ package com.css.simulation.resource.scheduler.scheduler;
 
 import api.common.pojo.constants.DictConstants;
 import api.common.util.CollectionUtil;
+import api.common.util.LinuxUtil;
+import api.common.util.StringUtil;
 import api.common.util.TimeUtil;
+import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import lombok.extern.slf4j.Slf4j;
@@ -12,6 +15,7 @@ import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import java.io.IOException;
 import java.util.List;
 
 @Component
@@ -19,12 +23,14 @@ import java.util.List;
 public class TickScheduler {
 
     @Value("${scheduler.manual-project.topic}")
-    private String manualProjectTopic;
+    String manualProjectTopic;
     @Autowired
     StringRedisTemplate redisTemplate;
 
     @Autowired
-    private TaskMapper taskMapper;
+    TaskMapper taskMapper;
+    @Autowired
+    ProjectMapper projectMapper;
 
     @Scheduled(fixedDelay = 2000)
     public void tick() {
@@ -41,6 +47,7 @@ public class TickScheduler {
             try {
                 String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId);
 //                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
+                assert s != null;
                 long tickTime = Long.parseLong(s);
                 long maxSimulationTime = task.getMaxSimulationTime() * 1000;
                 long now = TimeUtil.getNow();
@@ -57,4 +64,31 @@ public class TickScheduler {
         });
 
     }
+
+
+    /**
+     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
+     */
+    @Scheduled(fixedDelay = 60 * 1000)
+    public void checkProject() {
+        //1 查询出正在运行中的 project
+        List<String> projectIdList = projectMapper.selectIdByState("20");
+        //2 根据 projectId 获取 pod
+        projectIdList.forEach(projectId -> {
+            try {
+                String execute = LinuxUtil.execute("kubectl get pod | grep project-" + projectId);
+                int i = StringUtil.countSubString(execute, projectId);
+                if (i == 0) {
+                    LinuxUtil.execute("kubectl delete job project-" + projectId);
+                    String jobTemplateYamlPathTarget = "/opt/simulation-cloud/simulation-resource-scheduler/job-yaml/" + "project-" + projectId + ".yaml";
+                    LinuxUtil.execute("kubectl apply -f " + jobTemplateYamlPathTarget);
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        });
+
+        //3 如果 pod 为空,则重启 job
+
+    }
 }

+ 1 - 182
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -120,7 +120,7 @@ public class TaskService {
         ProjectPO projectPO = projectMapper.selectById(taskId);
         String projectId = projectPO.getId();
         String scenePackageId = projectPO.getScenePackageId();  // 场景测试包 id,指标根 id
-        log.info("------- /state 任务 " + taskId + " 的父项目为:" + projectId);
+//        log.info("------- /state 任务 " + taskId + " 的父项目为:" + projectId);
         int taskNum = projectMapper.selectTaskNumById(projectId);
         int endTaskNum = projectMapper.selectEndTaskNum(projectId);    // 查询已结束的任务 'Aborted', 'PendingAnalysis', 'Terminated'
         projectMapper.updateTaskCompleted(projectId, endTaskNum);
@@ -270,187 +270,6 @@ public class TaskService {
 
     }
 
-//    @SneakyThrows
-//    public void taskState(String taskId, String state, String podName) {
-//        log.info("------- /state 修改任务 " + taskId + "的状态:" + state + ",pod 名称为:" + podName);
-//        if ("Running".equals(state)) {
-//            taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
-//        } else if ("Aborted".equals(state) || "Terminated".equals(state)) {
-//            LinuxUtil.execute("kubectl delete pod " + podName);
-//            taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
-//        } else if ("PendingAnalysis".equals(state)) {
-//            LinuxUtil.execute("kubectl delete pod " + podName);
-//            taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
-//        } else {
-//            taskMapper.updateState(taskId, state);
-//        }
-//        ProjectPO projectPO = projectMapper.selectById(taskId);
-//        String projectId = projectPO.getId();
-//        String scenePackageId = projectPO.getScenePackageId();  // 场景测试包 id,指标根 id
-//        log.info("------- /state 任务 " + taskId + " 的父项目为:" + projectId);
-//        int taskNum = projectMapper.selectTaskNumById(projectId);
-//        int endTaskNum = projectMapper.selectEndTaskNum(projectId);    // 查询已结束的任务 'Aborted', 'PendingAnalysis', 'Terminated'
-//        projectMapper.updateTaskCompleted(projectId, endTaskNum);
-//        log.info("------- /state 项目 " + projectId + " 完成进度为:" + endTaskNum + "/" + taskNum);
-//        if (taskNum != endTaskNum) {  // 已结束任务数等于所有任务数量,才会准备打分;否则退出。
-//            return;
-//        }
-//        projectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());   // 修改该 project 的状态为已完成
-//        LinuxUtil.execute("kubectl delete job project-" + projectId);
-//        SshClient clientKafka = SshUtil.getClient();
-//        ClientSession sessionKafka = SshUtil.getSession(clientKafka, hostnameKafka, usernameKafka, passwordKafka);
-//        String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand, "topicName", projectId);
-//        SshUtil.execute(sessionKafka, topicDeleteCommand);
-//        SshUtil.stop(clientKafka, sessionKafka);
-//        List<TaskPO> taskList = taskMapper.selectTaskListByProjectId(projectId);  // 所有任务信息
-//        log.info("------- /state 共有 " + taskList.size() + "个任务!");
-//        taskIndexMapper.deleteFirstByProjectId(projectId);
-//        taskIndexMapper.deleteLastByProjectId(projectId);
-//        // -------------------------------- 查询叶子指标 --------------------------------
-//        List<IndexTemplatePO> leafIndexTemplateList = indexTemplateMapper.selectLeafIndexWithRuleDetailsByPackageId(scenePackageId);
-//        List<TaskIndexPO> leafTaskIndexList = new ArrayList<>();
-//        log.info("------- /state 共有 " + leafIndexTemplateList.size() + "个叶子节点!");
-//
-//        SshClient clientScore = SshUtil.getClient();
-//        ClientSession sessionScore = SshUtil.getSession(clientScore, hostnameScore, usernameScore, passwordScore);
-//        for (int i = 0; i < leafIndexTemplateList.size(); i++) {
-//            AtomicReference<String> scoreExplain = new AtomicReference<>(); // 每个叶子指标下的任务的得分说明一样和叶子指标一致
-//            IndexTemplatePO indexTemplatePO = leafIndexTemplateList.get(i);
-//            String indexId = indexTemplatePO.getIndexId();
-//            String parentId = indexTemplatePO.getParentId();
-//            String rootId = indexTemplatePO.getRootId();
-//            String weight = indexTemplatePO.getWeight();
-//            log.info("------- /state 开始执行对第 " + (i + 1) + " 个叶子节点 " + indexId + " 进行打分!");
-//            String ruleName = indexTemplatePO.getRuleName();    // 打分脚本名称,例如 AEB_1-1
-//            String ruleDetails = indexTemplatePO.getRuleDetails();    // 打分脚本内容
-//            String ruleFilePath = pyPath + "script/" + ruleName.split("_")[0] + "/" + ruleName + ".py";
-//            log.info("------- /state 将叶子节点 " + indexId + " 对应的打分规则保存到临时目录:" + ruleFilePath);
-//            FileUtil.writeInputStreamToLocalFile(IoUtil.stringToInputStream(ruleDetails), ruleFilePath);
-//            Set<String> sceneIdSet = new HashSet<>();
-//            String naturalIds = indexTemplatePO.getSceneNaturalIds();
-//            String standardIds = indexTemplatePO.getSceneStatueIds();
-//            String accidentIds = indexTemplatePO.getSceneTrafficIds();
-//            if (StringUtil.isNotEmpty(naturalIds)) {
-//                String[] naturalIdArray = naturalIds.split(",");
-//                sceneIdSet.addAll(Arrays.asList(naturalIdArray));
-//            }
-//            if (StringUtil.isNotEmpty(standardIds)) {
-//                String[] standardArray = standardIds.split(",");
-//                sceneIdSet.addAll(Arrays.asList(standardArray));
-//            }
-//            if (StringUtil.isNotEmpty(accidentIds)) {
-//                String[] accidentIdArray = accidentIds.split(",");
-//                sceneIdSet.addAll(Arrays.asList(accidentIdArray));
-//            }
-//            int resultNumberOfCurrentIndex = sceneIdSet.size();
-//            log.info("------- /state 叶子节点 " + indexId + " 包括 " + resultNumberOfCurrentIndex + " 个场景!");
-//            log.info("------- /state 计算叶子节点 " + indexId + " 的得分!");
-//            double sum = taskList.stream()
-//                    .filter(task1 -> sceneIdSet.contains(task1.getSceneId()))
-//                    .mapToDouble(task2 -> {
-//                        String task2Id = task2.getId();
-//                        taskMapper.updateState(task2Id, DictConstants.TASK_ANALYSING);
-//                        // 计算每个任务的得分
-//
-//                        String runResultMinio = task2.getRunResultFilePath() + "/Ego.csv";
-//                        String runResultLinux = linuxTempPath + runResultMinio;
-//
-////                        python3 /home/ubuntu/test/Evaluate/main.py /home/ubuntu/test/test_data.csv 4 AEB_1-2
-////                        String command = "python3 " + pyPath + " " + runResultLinux + " " + task2.getSceneType();  // 默认使用场景名称找打分脚本
-//                        String command = "python3 " + pyPath + "main.py " + runResultLinux + " " + task2.getSceneType() + " " + ruleName; // 指定打分脚本
-//                        String scoreResult;
-//                        try {
-//                            try {
-//                                log.info("------- /state 下载 minio 上的结果文件 " + runResultMinio + " 到本地:" + runResultLinux);
-//                                MinioUtil.downloadToFile(minioClient, bucketName, runResultMinio, runResultLinux);  // 也可改成下载到指定ip的服务器上,需要保证和打分脚本在一台机器上。
-//                            } catch (Exception e) {
-//                                throw new RuntimeException("------- /state 下载 minio 上的结果文件出错:" + e.getMessage());
-//                            }
-//                            try {
-//                                log.info("------- /state 开始执行打分命令:" + command);
-//                                scoreResult = SshUtil.execute(sessionScore, command);
-//                                log.info("------- /state 打分结束,结果为:" + scoreResult);
-//                            } catch (IOException e) {
-//                                throw new RuntimeException("------- /state 任务 " + task2Id + " 打分出错,命令为:" + command + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
-//                            }
-//                        } catch (Exception e) {
-//                            taskMapper.updateState(task2Id, DictConstants.TASK_ABORTED);
-//                            throw new RuntimeException(e.getMessage());
-//                        }
-//                        ScoreTO score = null;
-//                        try {
-//                            String replace = StringUtil.replace(scoreResult, "'", "\"");
-//                            score = JsonUtil.jsonToBean(replace, ScoreTO.class);
-//                        } catch (JsonProcessingException e) {
-//                            e.printStackTrace();
-//                        }
-//                        assert score != null;
-//                        task2.setReturnSceneId(score.getUnit_scene_ID());
-//                        task2.setScore(score.getUnit_scene_score());
-//                        task2.setTargetEvaluate(score.getEvaluate_item());
-//                        task2.setScoreExplain(score.getScore_description());
-//                        task2.setModifyUserId(USER_ID);
-//                        task2.setModifyTime(TimeUtil.getNowForMysql());
-//                        scoreExplain.set(score.getScore_description());
-//
-//                        taskMapper.updateState(task2Id, DictConstants.TASK_COMPLETED);
-//
-//                        return score.getUnit_scene_score();
-//                    }).sum();
-//            // 计算不合格的任务数(不到100分就是不合格)
-//            long notStandardSceneNum = taskList.stream().filter(task1 -> sceneIdSet.contains(task1.getSceneId()) && task1.getScore() < 100).count();
-//            // 叶子指标的得分(叶子指标下所有场景得分的平均值)
-//            double leafIndexScore = sum / resultNumberOfCurrentIndex;
-//            // -------------------------------- 保存叶子指标得分 --------------------------------
-//            indexTemplatePO.setTempScore(leafIndexScore);
-//            TaskIndexPO leafTaskIndex = TaskIndexPO.builder()
-//                    .id(StringUtil.getRandomUUID())
-//                    .pId(projectId)
-//                    .target(indexTemplatePO.getIndexId())
-//                    .notStandardSceneNum((int) notStandardSceneNum)
-//                    .score(leafIndexScore)
-//                    .indexId(indexId)
-//                    .parentId(parentId)
-//                    .rootId(rootId)
-//                    .weight(weight)
-//                    .scoreExplain(scoreExplain.get())
-//                    .build();
-//            leafTaskIndex.setCreateUserId(USER_ID);
-//            leafTaskIndex.setCreateTime(TimeUtil.getNowForMysql());
-//            leafTaskIndex.setModifyUserId(USER_ID);
-//            leafTaskIndex.setModifyTime(TimeUtil.getNowForMysql());
-//            leafTaskIndex.setIsDeleted("0");
-//            leafTaskIndexList.add(leafTaskIndex);
-//        }
-//        SshUtil.stop(clientScore, sessionScore);
-//        // 保存任务分数
-//        taskManager.batchUpdateByScoreResult(taskList);
-//        // 保存末级指标分数
-//        taskIndexManager.batchInsertLeafIndex(leafTaskIndexList);
-//        // 保存一级指标分数
-//        log.info("------- /state 根据每个指标的得分和权重算出各个一级指标的得分(即 project 对应的场景测试包下的一级指标)!");
-//        computeFirst(leafTaskIndexList, projectId);
-//
-//        // 调用 server 的接口,计算评价等级
-//        String tokenUrl = tokenUri + "?grant_type=client_credentials"
-//                + "&client_id=" + clientId
-//                + "&client_secret=" + clientSecret;
-//        log.info("------- /state 获取仿真云平台 token:" + tokenUrl);
-//        String response = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
-//        ObjectMapper objectMapper = new ObjectMapper();
-//        JsonNode jsonNode = objectMapper.readTree(response);
-//        String accessToken = jsonNode.path("access_token").asText();
-//        log.info("------- /state 仿真云平台 token 为:" + accessToken);
-//        Map<String, String> headers = new HashMap<>();
-//        headers.put("Authorization", "Bearer " + accessToken);
-//        Map<String, String> params = new HashMap<>();
-//        params.put("id", projectId);
-//        String post = HttpUtil.post(closeableHttpClient, requestConfig, evaluationLevelUri, headers, params);
-//        log.info("------- /state 访问仿真云平台评价等级接口:" + evaluationLevelUri +",请求头为:" + headers +",请求体为:" + params + "结果为:" + post);
-//        log.info("------- /state 项目 " + projectId + " 打分完成");
-//
-//
-//    }
 
 
     public Boolean taskConfirm(String taskId) {

+ 1 - 1
simulation-resource-scheduler/src/main/resources/kubernetes/template/job/job-template.yaml

@@ -39,7 +39,7 @@ spec:
           image: algorithm-image
           imagePullPolicy: Never
           command: [ "/bin/sh", "-c", "/AEB/start_docker.sh; touch /tmp/hello.txt;while true;do /bin/echo $(date +%T) >> /tmp/hello.txt; sleep 5; done;" ]
-      restartPolicy: Never
+      restartPolicy: Always
       volumes:
         - name: nvidia0
           hostPath: