martin 3 năm trước cách đây
mục cha
commit
879fb6d28b

+ 2 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/configuration/kubernetes/KubernetesConfiguration.java

@@ -5,7 +5,6 @@ import io.kubernetes.client.util.ClientBuilder;
 import io.kubernetes.client.util.KubeConfig;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.util.ResourceUtils;
 
 import java.io.File;
 import java.io.FileReader;
@@ -16,9 +15,9 @@ public class KubernetesConfiguration {
 
     @Bean
     public ApiClient apiClient() throws IOException {
-        File config = ResourceUtils.getFile("classpath:kubernetes/config");  // 开发环境可用,生产环境不行,无法从jar 包读取
+//        File config = ResourceUtils.getFile("classpath:kubernetes/config");  // 开发环境可用,生产环境不行,无法从jar 包读取
 //        File config = new File("D:\\idea-project\\simulation-cloud\\simulation-resource-scheduler\\src\\main\\resources\\kubernetes\\config");  //windows
-//        File config = new File("/root/.kube/config");   //linux
+        File config = new File("/root/.kube/config");   //linux
 //
 //        ClassPathResource classPathResource = new ClassPathResource("kubernetes/config");
 //        InputStream inputStream = classPathResource.getInputStream();

Những thai đổi đã bị hủy bỏ vì nó quá lớn
+ 0 - 266
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java


+ 8 - 6
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/TaskController.java

@@ -1,14 +1,16 @@
 package com.css.simulation.resource.scheduler.controller;
 
 
-import api.common.pojo.dto.ProjectMessageDTO;
 import api.common.util.IoUtil;
 import com.css.simulation.resource.scheduler.consumer.ManualProjectConsumer;
 import com.css.simulation.resource.scheduler.service.TaskService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.cloud.context.config.annotation.RefreshScope;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
 
 import javax.servlet.http.HttpServletResponse;
 import java.io.FileInputStream;
@@ -42,10 +44,10 @@ public class TaskController {
 
 
 
-    @PostMapping("/test")
-    public void test(@RequestBody ProjectMessageDTO projectMessageDTO) {
-        manualProjectConsumer.parseProject1(projectMessageDTO);
-    }
+//    @PostMapping("/test")
+//    public void test(@RequestBody ProjectMessageDTO projectMessageDTO) {
+//        manualProjectConsumer.parseProject1(projectMessageDTO);
+//    }
 
     /**
      * Pod 的心跳接口

+ 2 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskMapper.java

@@ -50,7 +50,8 @@ public interface TaskMapper {
 
     @Update("update simulation_manual_project_task\n" +
             "set run_state = #{runState},run_end_time = #{runStopTime},run_result='Failed'\n" +
-            "where id = #{id}")
+            "where id = #{id}" +
+            " and score is null")
     void updateFailStateWithStopTime(@Param("id") String id, @Param("runState") String runState,@Param("runStopTime") Timestamp runStopTime);
 
     @Update("update simulation_manual_project_task\n" +

+ 99 - 90
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -1,13 +1,22 @@
 package com.css.simulation.resource.scheduler.scheduler;
 
+import api.common.pojo.constants.DictConstants;
+import api.common.util.*;
 import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.session.ClientSession;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import java.io.IOException;
+import java.util.List;
+
 @Component
 @Slf4j
 public class TickScheduler {
@@ -24,94 +33,94 @@ public class TickScheduler {
     @Value("${scheduler.manual-project.job-yaml}")
     String jobYaml;
 
-//    @Scheduled(fixedDelay = 2000)
-//    public void tick() {
-//
-//        List<TaskPO> executingTaskList = taskMapper.selectExecuting();
-//        if (CollectionUtil.isEmpty(executingTaskList)) {
-//            return;
-//        }
-////        log.info("------- TickScheduler 查询出所有执行中的任务('Running'):" + executingTaskList);
-//        //2 根据 key 查出任务的心跳时间
-//        executingTaskList.forEach(task -> {
-//            String taskId = task.getId();
-//            String projectId = task.getPId();
-//            try {
-//                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId);
-////                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
-//                assert s != null;
-//                long tickTime = Long.parseLong(s);
-//                long maxSimulationTime = task.getMaxSimulationTime() * 1000;
-//                long now = TimeUtil.getNow();
-//                long difference = now - tickTime;
-////                log.info("------- TickScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
-//                if (difference > maxSimulationTime) {
-////                    log.info("------- TickScheduler 任务" + taskId + "已超时,状态修改为:" + DictConstants.TASK_ABORTED);
-//                    taskMapper.updateState(taskId, DictConstants.TASK_ABORTED);
-//                }
-//            } catch (Exception e) {
-//                throw new RuntimeException(e.getMessage());
-//            }
-//
-//        });
-//
-//    }
-//
-//
-//    /**
-//     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
-//     */
-//    @Scheduled(fixedDelay = 60 * 1000)
-//    public void checkProject() throws IOException {
-//        SshClient client = SshUtil.getClient();
-//        ClientSession session = SshUtil.getSession(client, "182.92.203.182", "root", "CICV2022test");
-//
-//        //1 查询出正在运行中的 project
-//        List<String> projectIdList = projectMapper.selectIdByState("20");
-//        log.info("TickScheduler-------checkProject 查询出正在运行中的 project" + projectIdList);
-//        //2 根据 projectId 获取 pod
-//        projectIdList.forEach(projectId -> {
-//
-//            String key = manualProjectTopic + ":" + projectId + ":check";
-//            String nowString = TimeUtil.getNowString();
-//
-//            try {
-//                String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
-//                String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-//                int taskNumber = StringUtil.countSubString(podList, "project");
-//                if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
-//                    redisTemplate.opsForValue().set(key, nowString);
-//                }
-//
-//                if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {
-//                    // 判断两次是否超过2分钟
-//                    //3 如果 pod 为空,则重启 job
-//                    long lastNow = Long.parseLong(lastNowString);
-//                    long now = Long.parseLong(nowString);
-//                    if (now - lastNow > 2L * 60 * 1000) {
-//                        LinuxUtil.execute("kubectl delete job project-" + projectId);
-//                        while (true) {
-//                            log.info("TickScheduler-------checkProject 准备重启项目 " + projectId);
-//                            Thread.sleep(10000);
-//                            String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-//                            log.info("TickScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:" + podList2);
-//                            int taskNumber2 = StringUtil.countSubString(podList2, "project");
-//                            if (taskNumber2 == 0) {
-//                                break;
-//                            }
-//                        }
-//                        log.info("TickScheduler-------checkProject 重新执行项目" + projectId);
-//                        String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
-//                        LinuxUtil.execute("kubectl apply -f " + jobTemplateYamlPathTarget);
-//                    }
-//                }
-//            } catch (IOException | InterruptedException e) {
-//                e.printStackTrace();
-//            }
-//        });
-//
-//        session.close();
-//        client.stop();
-//
-//    }
+    @Scheduled(fixedDelay = 60 * 1000)
+    public void tick() {
+
+        List<TaskPO> executingTaskList = taskMapper.selectExecuting();
+        if (CollectionUtil.isEmpty(executingTaskList)) {
+            return;
+        }
+//        log.info("------- TickScheduler 查询出所有执行中的任务('Running'):" + executingTaskList);
+        //2 根据 key 查出任务的心跳时间
+        executingTaskList.forEach(task -> {
+            String taskId = task.getId();
+            String projectId = task.getPId();
+            try {
+                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId);
+//                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
+                assert s != null;
+                long tickTime = Long.parseLong(s);
+                long maxSimulationTime = task.getMaxSimulationTime() * 1000;
+                long now = TimeUtil.getNow();
+                long difference = now - tickTime;
+//                log.info("------- TickScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
+                if (difference > maxSimulationTime) {
+//                    log.info("------- TickScheduler 任务" + taskId + "已超时,状态修改为:" + DictConstants.TASK_ABORTED);
+                    taskMapper.updateState(taskId, DictConstants.TASK_ABORTED);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e.getMessage());
+            }
+
+        });
+
+    }
+
+
+    /**
+     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
+     */
+    @Scheduled(fixedDelay = 60 * 1000)
+    public void checkProject() throws IOException {
+        SshClient client = SshUtil.getClient();
+        ClientSession session = SshUtil.getSession(client, "182.92.203.182", "root", "CICV2022test");
+
+        //1 查询出正在运行中的 project
+        List<String> projectIdList = projectMapper.selectIdByState("20");
+        log.info("TickScheduler-------checkProject 查询出正在运行中的 project" + projectIdList);
+        //2 根据 projectId 获取 pod
+        projectIdList.forEach(projectId -> {
+
+            String key = manualProjectTopic + ":" + projectId + ":check";
+            String nowString = TimeUtil.getNowString();
+
+            try {
+                String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
+                String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+                int taskNumber = StringUtil.countSubString(podList, "project");
+                if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
+                    redisTemplate.opsForValue().set(key, nowString);
+                }
+
+                if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {
+                    // 判断两次是否超过2分钟
+                    //3 如果 pod 为空,则重启 job
+                    long lastNow = Long.parseLong(lastNowString);
+                    long now = Long.parseLong(nowString);
+                    if (now - lastNow > 2L * 60 * 1000) {
+                        LinuxUtil.execute("kubectl delete job project-" + projectId);
+                        Thread.sleep(30000);
+                        while (true) {
+                            log.info("TickScheduler-------checkProject 准备重启项目 " + projectId);
+                            String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+                            log.info("TickScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:" + podList2);
+                            int taskNumber2 = StringUtil.countSubString(podList2, "project");
+                            if (taskNumber2 == 0) {
+                                break;
+                            }
+                        }
+                        log.info("TickScheduler-------checkProject 重新执行项目" + projectId);
+                        String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
+                        LinuxUtil.execute("kubectl apply -f " + jobTemplateYamlPathTarget);
+                    }
+                }
+            } catch (IOException | InterruptedException e) {
+                e.printStackTrace();
+            }
+        });
+
+        session.close();
+        client.stop();
+
+    }
 }

+ 57 - 44
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -118,6 +118,9 @@ public class TaskService {
             taskMapper.updateState(taskId, state);
         }
         ProjectPO projectPO = projectMapper.selectById(taskId);
+        if (projectPO == null) {
+            return;
+        }
         String projectId = projectPO.getId();
         String scenePackageId = projectPO.getScenePackageId();  // 场景测试包 id,指标根 id
 //        log.info("------- /state 任务 " + taskId + " 的父项目为:" + projectId);
@@ -160,65 +163,75 @@ public class TaskService {
             log.info("------- /state 将叶子节点 " + indexId + " 对应的打分规则保存到临时目录:" + ruleFilePath);
             FileUtil.writeInputStreamToLocalFile(IoUtil.stringToInputStream(ruleDetails), ruleFilePath);
 
-            List<TaskPO> taskListOfLeafIndex = taskList.stream().filter(task -> indexId.equals(task.getLastTargetId())).collect(Collectors.toList());
+            List<TaskPO> taskListOfLeafIndex = taskList.stream()
+                    .filter(task -> indexId.equals(task.getLastTargetId()) && DictConstants.TASK_ANALYSIS.equals(task.getRunState()))
+                    .collect(Collectors.toList());
             int resultNumberOfCurrentIndex = taskListOfLeafIndex.size();
             log.info("------- /state 叶子节点 " + indexId + " 包括 " + resultNumberOfCurrentIndex + " 个任务!");
             log.info("------- /state 计算叶子节点 " + indexId + " 的得分!");
             double sum = taskListOfLeafIndex.stream()
                     .mapToDouble(task2 -> {
-                        String task2Id = task2.getId();
-                        taskMapper.updateState(task2Id, DictConstants.TASK_ANALYSING);
-                        // 计算每个任务的得分
+                        String runState = task2.getRunState();
+                        if (!DictConstants.TASK_ANALYSIS.equals(runState)) {
+                            return 0.0;
+                        } else {
+                            String task2Id = task2.getId();
+                            taskMapper.updateState(task2Id, DictConstants.TASK_ANALYSING);
+                            // 计算每个任务的得分
 
-                        String runResultMinio = task2.getRunResultFilePath() + "/Ego.csv";
-                        String runResultLinux = linuxTempPath + runResultMinio;
+                            String runResultMinio = task2.getRunResultFilePath() + "/Ego.csv";
+                            String runResultLinux = linuxTempPath + runResultMinio;
 
 //                        python3 /home/ubuntu/test/Evaluate/main.py /home/ubuntu/test/test_data.csv 4 AEB_1-2
 //                        String command = "python3 " + pyPath + " " + runResultLinux + " " + task2.getSceneType();  // 默认使用场景名称找打分脚本
-                        String command = "python3 " + pyPath + "main.py " + runResultLinux + " " + task2.getSceneType() + " " + ruleName; // 指定打分脚本
-                        String scoreResult;
-                        try {
+                            String command = "python3 " + pyPath + "main.py " + runResultLinux + " " + task2.getSceneType() + " " + ruleName; // 指定打分脚本
+                            String scoreResult;
                             try {
-                                log.info("------- /state 下载 minio 上的结果文件 " + runResultMinio + " 到本地:" + runResultLinux);
-                                MinioUtil.downloadToFile(minioClient, bucketName, runResultMinio, runResultLinux);  // 也可改成下载到指定ip的服务器上,需要保证和打分脚本在一台机器上。
+                                try {
+                                    log.info("------- /state 下载 minio 上的结果文件 " + runResultMinio + " 到本地:" + runResultLinux);
+                                    MinioUtil.downloadToFile(minioClient, bucketName, runResultMinio, runResultLinux);  // 也可改成下载到指定ip的服务器上,需要保证和打分脚本在一台机器上。
+                                } catch (Exception e) {
+                                    throw new RuntimeException("------- /state 下载 minio 上的结果文件出错:" + e.getMessage());
+                                }
+                                try {
+                                    log.info("------- /state 开始执行打分命令:" + command);
+                                    scoreResult = SshUtil.execute(sessionScore, command);
+                                    log.info("------- /state 打分结束,结果为:" + scoreResult);
+                                } catch (IOException e) {
+                                    throw new RuntimeException("------- /state 任务 " + task2Id + " 打分出错,命令为:" + command + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
+                                }
                             } catch (Exception e) {
-                                throw new RuntimeException("------- /state 下载 minio 上的结果文件出错:" + e.getMessage());
+                                taskMapper.updateState(task2Id, DictConstants.TASK_ABORTED);
+                                throw new RuntimeException(e.getMessage());
                             }
+                            ScoreTO score = null;
                             try {
-                                log.info("------- /state 开始执行打分命令:" + command);
-                                scoreResult = SshUtil.execute(sessionScore, command);
-                                log.info("------- /state 打分结束,结果为:" + scoreResult);
-                            } catch (IOException e) {
-                                throw new RuntimeException("------- /state 任务 " + task2Id + " 打分出错,命令为:" + command + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
+                                String replace = StringUtil.replace(scoreResult, "'", "\"");
+                                score = JsonUtil.jsonToBean(replace, ScoreTO.class);
+                            } catch (JsonProcessingException e) {
+                                e.printStackTrace();
                             }
-                        } catch (Exception e) {
-                            taskMapper.updateState(task2Id, DictConstants.TASK_ABORTED);
-                            throw new RuntimeException(e.getMessage());
-                        }
-                        ScoreTO score = null;
-                        try {
-                            String replace = StringUtil.replace(scoreResult, "'", "\"");
-                            score = JsonUtil.jsonToBean(replace, ScoreTO.class);
-                        } catch (JsonProcessingException e) {
-                            e.printStackTrace();
-                        }
-                        assert score != null;
-                        task2.setReturnSceneId(score.getUnit_scene_ID());
-                        task2.setScore(score.getUnit_scene_score());
-                        task2.setTargetEvaluate(score.getEvaluate_item());
-                        task2.setScoreExplain(score.getScore_description());
-                        task2.setModifyUserId(USER_ID);
-                        task2.setModifyTime(TimeUtil.getNowForMysql());
-                        scoreExplain.set(score.getScore_description());
+                            assert score != null;
+                            task2.setReturnSceneId(score.getUnit_scene_ID());
+                            task2.setScore(score.getUnit_scene_score());
+                            task2.setTargetEvaluate(score.getEvaluate_item());
+                            task2.setScoreExplain(score.getScore_description());
+                            task2.setModifyUserId(USER_ID);
+                            task2.setModifyTime(TimeUtil.getNowForMysql());
+                            scoreExplain.set(score.getScore_description());
 
-                        taskMapper.updateState(task2Id, DictConstants.TASK_COMPLETED);
+                            taskMapper.updateState(task2Id, DictConstants.TASK_COMPLETED);
+
+                            return score.getUnit_scene_score();
+                        }
 
-                        return score.getUnit_scene_score();
                     }).sum();
-            // 计算不合格的任务数(不到100分就是不合格)
-            long notStandardSceneNum = taskListOfLeafIndex.stream().filter(task -> task.getScore() < 100).count();
-            // 叶子指标的得分(叶子指标下所有场景得分的平均值)
-            double leafIndexScore = sum / resultNumberOfCurrentIndex;
+            // 计算不合格的任务数(不到100分就是不合格,执行失败的不算)
+            long notStandardSceneNum = taskListOfLeafIndex.stream()
+                    .filter(task -> task.getScore() < 100 && task.getScore() != 0.0 && DictConstants.TASK_COMPLETED.equals(task.getRunState()))
+                    .count();
+            // 叶子指标的得分(叶子指标下所有场景得分的平均值,执行失败的不算)
+            double leafIndexScore = (int) (sum / resultNumberOfCurrentIndex * 100) / 100.0;
             // -------------------------------- 保存叶子指标得分 --------------------------------
             indexTemplatePO.setTempScore(leafIndexScore);
             TaskIndexPO leafTaskIndex = TaskIndexPO.builder()
@@ -246,6 +259,7 @@ public class TaskService {
         // 保存末级指标分数
         taskIndexManager.batchInsertLeafIndex(leafTaskIndexList);
         // 保存一级指标分数
+        log.info("------- /state 项目 " + projectId + " 的所有任务分数为:" + taskList);
         log.info("------- /state 根据每个指标的得分和权重算出各个一级指标的得分(即 project 对应的场景测试包下的一级指标)!");
         computeFirst(leafTaskIndexList, projectId);
 
@@ -271,7 +285,6 @@ public class TaskService {
     }
 
 
-
     public Boolean taskConfirm(String taskId) {
         // 查询 task 如果不是 pending 则不执行
         String state = taskMapper.selectStateById(taskId);
@@ -314,7 +327,7 @@ public class TaskService {
                         .id(StringUtil.getRandomUUID())
                         .pId(projectId)
                         .target(indexTemplate.getIndexId())
-                        .score(parentScore)
+                        .score(((int) parentScore * 100) / 100.0)
                         .indexId(indexTemplate.getIndexId())
                         .parentId(indexTemplate.getParentId())
                         .rootId(indexTemplate.getRootId())

Một số tệp đã không được hiển thị bởi vì quá nhiều tập tin thay đổi trong này khác