martin 3 năm trước cách đây
mục cha
commit
17c78bcbfe

+ 24 - 23
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -103,48 +103,49 @@ public class ManualProjectConsumer {
         UserPO userPO = userMapper.selectById(userId);
         String roleCode = userPO.getRoleCode();
         String useType = userPO.getUseType();
+        ClusterPO clusterPO;
         if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
-            parseManualProject(projectRecord);
+            parseManualProject(projectJson);
+            return;
         } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
             // 获取拥有的节点数量,即仿真软件证书数量
-            ClusterPO clusterPO = clusterMapper.selectByUserId(userId);
-            String clusterId = clusterPO.getId();
-            int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
-            // 获取该用户正在运行的项目数量
-            Set<String> runningProjectSet = redisTemplate.keys(manualProjectTopic + ":" + userId + ":monopoly:running" + "*");
-            int runningProjectNumber = CollectionUtil.isEmpty(runningProjectSet) ? 0 : runningProjectSet.size();
-            if (runningProjectNumber < simulationLicenseNumber) {
-                redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":running" + projectId, projectJson);
-                parseManualProject(projectRecord);
-            } else {
-                redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":waiting" + projectId, projectJson);
-            }
-
+            clusterPO = clusterMapper.selectByUserId(userId);
         } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
             if (DictConstants.USE_TYPE_EXCLUSIVE.equals(useType)) {   //3-2 普通子账户,根据自己的独占节点排队
-
+                clusterPO = clusterMapper.selectByUserId(userId);
             } else {    // 共享子账户需要查询父账户的集群 id
-
+                String parentUserId = userPO.getCreateUserId();
+                clusterPO = clusterMapper.selectByUserId(parentUserId);
             }
-            parseManualProject(projectRecord);
         } else {
-            parseManualProject(projectRecord);
+            parseManualProject(projectJson);
+            return;
+        }
+        // 获取拥有的节点数量,即仿真软件证书数量
+        String clusterId = clusterPO.getId();
+        int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
+        // 获取该集群中正在运行的项目数量
+        Set<String> runningProjectSet = redisTemplate.keys(manualProjectTopic + ":cluster:" + clusterId + ":running" + "*");
+        int runningProjectNumber = CollectionUtil.isEmpty(runningProjectSet) ? 0 : runningProjectSet.size();
+        if (runningProjectNumber < simulationLicenseNumber) {
+            redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":running" + projectId, projectJson);
+            parseManualProject(projectJson);
+        } else {
+            redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":waiting" + projectId, projectJson);
         }
-
     }
 
 
     /**
      * 开始执行以及重新执行
      *
-     * @param projectRecord 项目启动消息
+     * @param projectJson 项目启动消息
      */
     @SneakyThrows
-    public void parseManualProject(ConsumerRecord<String, String> projectRecord) {
+    public void parseManualProject( String projectJson) {
 
         // -------------------------------- 0 准备 --------------------------------
-        log.info("------- ManualProjectConsumer 接收到项目开始消息为:" + projectRecord);
-        String projectJson = projectRecord.value();
+        log.info("------- ManualProjectConsumer 接收到项目开始消息为:" + projectJson);
         //1 读取 kafka 的 project 信息
         ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
         String projectId = projectMessageDTO.getProjectId();    // 项目 id

+ 420 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -0,0 +1,420 @@
+package com.css.simulation.resource.scheduler.manager;
+
+import api.common.pojo.constants.DictConstants;
+import api.common.util.*;
+import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
+import com.css.simulation.resource.scheduler.mapper.IndexMapper;
+import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
+import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+import com.css.simulation.resource.scheduler.pojo.po.*;
+import com.css.simulation.resource.scheduler.pojo.to.ScoreTO;
+import com.css.simulation.resource.scheduler.util.MinioUtil;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.minio.MinioClient;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.sshd.client.session.ClientSession;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+@Component
+@Slf4j
+public class TaskManager {
+    @Autowired
+    ClusterMapper clusterMapper;
+    @Autowired
+    StringRedisTemplate stringRedisTemplate;
+    @Value("${scheduler.manual-project.topic}")
+    String manualProjectTopic;
+    @Autowired
+    TaskMapper taskMapper;
+    @Value("${scheduler.manual-project.result-path-minio}")
+    String resultPathMinio;
+    @Autowired
+    MinioClient minioClient;
+    @Value("${minio.bucket-name}")
+    String bucketName;
+    @Autowired
+    ManualProjectMapper manualProjectMapper;
+    @Autowired
+    KafkaTemplate<String, String> kafkaTemplate;
+
+    @Autowired
+    TaskIndexManager taskIndexManager;
+    @Autowired
+    IndexMapper indexMapper;
+    @Value("${scheduler.score.py-path}")
+    String pyPath;
+    @Value("${scheduler.linux-temp-path}")
+    String linuxTempPath;
+    @Value("${simulation-cloud.client-id}")
+    String clientId;
+    @Value("${simulation-cloud.client-secret}")
+    String clientSecret;
+    @Value("${simulation-cloud.token-uri}")
+    String tokenUri;
+    @Value("${simulation-cloud.evaluation-level-uri}")
+    String evaluationLevelUri;
+    @Autowired
+    CloseableHttpClient closeableHttpClient;
+    @Autowired
+    RequestConfig requestConfig;
+
+
+
+    @SneakyThrows
+    public boolean isProjectCompleted(String userId, String projectId, String taskId, String state, String podName, ClientSession session) {
+
+        if ("Running".equals(state)) {
+            // 将运行中的任务的 pod 名称放入 redis
+            stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":pod", podName);
+            taskTick(taskId); // 刷新一下心跳
+            log.info("TaskManager--state 修改任务 " + taskId + "的状态为 Running,pod 名称为:" + podName);
+            taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
+            return false;
+        } else {
+            String podDeleteCommand = "kubectl delete pod " + podName;
+            log.info("TaskManager--state 修改任务 " + taskId + "的状态为:" + state + ",pod 名称为:" + podName + ",并执行删除 pod 命令:" + podDeleteCommand);
+            if ("Aborted".equals(state)) {
+                if (retry(userId, projectId, taskId)) {
+                    taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
+                    return false;
+                }
+                //result-path-minio: /project/manual-project/
+                String minioPathOfErrorLog = resultPathMinio + projectId + "/" + taskId + "error.log";
+                boolean objectExist = MinioUtil.isObjectExist(minioClient, bucketName, minioPathOfErrorLog);
+                String targetEvaluate;
+                if (objectExist) {
+                    String errorString = MinioUtil.downloadToString(minioClient, bucketName, minioPathOfErrorLog);
+                    String[] lines = errorString.split("\n");
+                    StringBuilder errorMessage = new StringBuilder();
+                    for (String line : lines) {
+                        if (line.startsWith("Original Error")) {
+                            errorMessage.append(line).append("\n");
+                        }
+                        if (line.startsWith("Possible Cause")) {
+                            errorMessage.append(line);
+                            break;
+                        }
+                    }
+                    targetEvaluate = errorMessage.toString();
+                } else {
+                    targetEvaluate = DictConstants.TASK_ERROR_REASON_2;
+                }
+                taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), targetEvaluate);
+            } else if ("Terminated".equals(state)) {
+                if (retry(userId, projectId, taskId)) {
+                    taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
+                    return false;
+                }
+                taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_3);
+            } else if ("PendingAnalysis".equals(state)) {
+                taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
+            } else {
+                if (retry(userId, projectId, taskId)) {
+                    taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
+                    return false;
+                }
+                taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_4);
+            }
+            SshUtil.execute(session, podDeleteCommand);
+        }
+        int taskNum = taskMapper.selectTaskNumByProjectId(projectId);
+        int endTaskNum = taskMapper.selectEndTaskNumByProjectId(projectId);    // 查询已结束的任务 'Aborted', 'PendingAnalysis', 'Terminated'
+        manualProjectMapper.updateTaskCompleted(projectId, endTaskNum);
+        log.info("TaskManager--state 项目 " + projectId + " 完成进度为:" + endTaskNum + "/" + taskNum);
+        // 已结束任务数等于所有任务数量,才会准备打分;否则退出。
+        return taskNum == endTaskNum;
+
+    }
+
+    public boolean retry(String userId, String projectId, String taskId) {
+        log.info("TaskService--retry 重试操作收到的参数为:userId=" + userId + ",projectId=" + projectId + ",taskId=" + taskId);
+        //1 首先查看任务是否重试过 3 次
+        String retryString = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":retry");
+        int retry = Integer.parseInt(Objects.requireNonNull(retryString));
+        //2 如果重试次数没有超过 3 次,则重试
+        if (retry > 3) {
+            return false;
+        }
+        String taskJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":message");
+        retry++;
+        log.info("TaskService--retry 重试项目 " + projectId + " 的任务 " + taskId + ",重试次数为:" + retry + ",重新发送的消息为:" + taskJson);
+        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":retry", retry + "");
+        kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
+            // 消息发送到的topic
+            assert success != null;
+            String topic = success.getRecordMetadata().topic();
+            // 消息发送到的分区
+            int partition = success.getRecordMetadata().partition();
+            // 消息在分区内的offset
+            long offset = success.getRecordMetadata().offset();
+            log.info("------- ManualProjectConsumer 发送消息成功:\n"
+                    + "主题 topic 为:" + topic + "\n"
+                    + "分区 partition 为:" + partition + "\n"
+                    + "偏移量为:" + offset + "\n"
+                    + "消息体为:" + taskJson);
+        }, failure -> {
+            log.error("------- 发送消息失败:" + failure.getMessage());
+        });
+        return true;
+    }
+
+    public void prepareScore(String userId, String projectId) {
+        log.info("TaskManager--prepareScore 项目 " + projectId + "准备打分!");
+        ClusterPO clusterPO = clusterMapper.selectByUserId(userId);
+        String clusterId = clusterPO.getId();
+        stringRedisTemplate.delete(manualProjectTopic + ":cluster:" + clusterId + ":running" + projectId);
+
+    }
+
+    @SneakyThrows
+    public void score(String userId, String projectId, ClientSession session){
+        // -------------------------------- 打分 --------------------------------
+        ProjectPO projectPO = manualProjectMapper.selectById(projectId);
+        String packageId = projectPO.getScenePackageId();  // 场景测试包 id,指标根 id
+        List<TaskPO> taskList = taskMapper.selectTaskListByProjectId(projectId);  // 所有任务信息
+        indexMapper.deleteFirstByProjectId(projectId);
+        indexMapper.deleteLastByProjectId(projectId);
+        //1 查询场景包对应指标
+        String allIndexTemplateListJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":package:" + packageId + ":all");
+        List<IndexTemplatePO> allIndexTemplateList = JsonUtil.jsonToList(allIndexTemplateListJson, IndexTemplatePO.class);
+        String leafIndexTemplateListJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":package:" + packageId + ":leaf");
+        List<IndexTemplatePO> leafIndexTemplateList = JsonUtil.jsonToList(leafIndexTemplateListJson, IndexTemplatePO.class);
+        log.info("TaskService--state 共有 " + leafIndexTemplateList.size() + "个叶子节点:" + leafIndexTemplateListJson);
+        int maxLevel = 1; // 用于计算指标得分
+        List<LeafIndexPO> leafIndexList = new ArrayList<>();
+        for (int i = 0; i < leafIndexTemplateList.size(); i++) {
+            AtomicReference<String> scoreExplain = new AtomicReference<>(); // 每个叶子指标下的任务的得分说明一样和叶子指标一致
+            IndexTemplatePO leafIndexTemplate = leafIndexTemplateList.get(i);
+            String indexId = leafIndexTemplate.getIndexId();
+            String parentId = leafIndexTemplate.getParentId(); // 父 id
+            String rootId = leafIndexTemplate.getRootId(); // 包 id
+            String weight = leafIndexTemplate.getWeight(); // 权重
+            Integer packageLevel = leafIndexTemplate.getPackageLevel(); // 几级指标
+            String ruleName = leafIndexTemplate.getRuleName();    // 打分脚本名称,例如 AEB_1-1
+            String ruleDetails = leafIndexTemplate.getRuleDetails();    // 打分脚本内容
+            if (packageLevel > maxLevel) {
+                maxLevel = packageLevel;
+            }
+            log.info("TaskService--state 开始执行对第 " + (i + 1) + " 个叶子节点 " + indexId + " 进行打分!");
+
+            String ruleFilePath = pyPath + "scripts/" + ruleName.split("_")[0] + "/" + ruleName + ".py";
+            log.info("TaskService--state 将叶子节点 " + indexId + " 对应的打分规则保存到临时目录:" + ruleFilePath);
+            FileUtil.writeInputStreamToLocalFile(IoUtil.stringToInputStream(ruleDetails), ruleFilePath);
+            List<TaskPO> taskListOfLeafIndex = taskList.stream()
+                    .filter(task -> indexId.equals(task.getLastTargetId()))
+                    .collect(Collectors.toList());
+            log.info("TaskService--state 叶子节点 " + indexId + " 包括 " + taskListOfLeafIndex.size() + " 个成功运行结束任务!");
+            log.info("TaskService--state 计算叶子节点 " + indexId + " 的得分:" + taskListOfLeafIndex);
+            taskListOfLeafIndex.forEach(taskOfLeaf -> {
+                String runState = taskOfLeaf.getRunState();
+                if (DictConstants.TASK_ANALYSIS.equals(runState)) {
+                    String task2Id = taskOfLeaf.getId();
+                    taskMapper.updateSuccessStateWithStopTime(task2Id, DictConstants.TASK_ANALYSING, TimeUtil.getNowForMysql());
+                    // 计算每个任务的得分
+                    String runResultMinio = taskOfLeaf.getRunResultFilePath() + "/Ego.csv";
+                    String runResultLinux = linuxTempPath + runResultMinio;
+                    String scoreCommand = "python3 " + pyPath + "main.py " + runResultLinux + " " + taskOfLeaf.getSceneType() + " " + ruleName; // 指定打分脚本
+                    String scoreResult;
+                    ScoreTO score;
+                    try {
+                        try {
+                            log.info("TaskService--state 下载 minio 上的结果文件 " + runResultMinio + " 到本地:" + runResultLinux);
+                            MinioUtil.downloadToFile(minioClient, bucketName, runResultMinio, runResultLinux);  // 也可改成下载到指定ip的服务器上,需要保证和打分脚本在一台机器上。
+                        } catch (Exception e) {
+                            throw new RuntimeException("------- TaskService--state 下载 minio 上的结果文件出错:" + e.getMessage());
+                        }
+                        try {
+                            log.info("TaskService--state 开始执行打分命令:" + scoreCommand);
+                            scoreResult = SshUtil.execute(session, scoreCommand);
+                            log.info("TaskService--state 项目" + projectId + "的任务" + task2Id + "打分结束,结果为:" + scoreResult);
+                            String replace = StringUtil.replace(scoreResult, "'", "\"");
+                            score = JsonUtil.jsonToBean(replace, ScoreTO.class);
+                        } catch (IOException e) {
+                            throw new RuntimeException("------- TaskService--state 项目" + projectId + "的任务" + task2Id + " 打分出错,命令为:" + scoreCommand + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
+                        }
+                    } catch (Exception e) {
+                        taskOfLeaf.setRunState(DictConstants.TASK_ABORTED);
+                        taskMapper.updateFailStateWithStopTime(task2Id, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_5);
+                        log.error(e.getMessage());
+                        return; // 如果打分失败则开始下一个打分
+                    }
+                    assert score != null;
+                    taskOfLeaf.setReturnSceneId(score.getUnit_scene_ID());
+                    taskOfLeaf.setScore(score.getUnit_scene_score());
+                    taskOfLeaf.setTargetEvaluate(score.getEvaluate_item());
+                    taskOfLeaf.setScoreExplain(score.getScore_description());
+                    taskOfLeaf.setModifyUserId(userId);
+                    taskOfLeaf.setModifyTime(TimeUtil.getNowForMysql());
+                    scoreExplain.set(score.getScore_description());
+
+                    taskOfLeaf.setRunState(DictConstants.TASK_COMPLETED);
+                    taskMapper.updateSuccessStateAndScoreResultWithStopTime(
+                            taskOfLeaf,
+                            DictConstants.TASK_COMPLETED,
+                            TimeUtil.getNowForMysql()
+                    );
+                }
+            });
+
+
+            // 计算不合格的任务数(不到100分就是不合格,执行失败的不算)
+            long notStandardSceneNumber = taskListOfLeafIndex.stream()
+                    .filter(task -> task.getScore() < 100)
+                    .count();
+
+            // 计算叶子指标下任务得分总和
+            double leafSum = taskListOfLeafIndex.stream()
+                    .mapToDouble(TaskPO::getScore)
+                    .sum();
+            // 计算任务的个数
+            long resultNumberOfCurrentIndex = taskListOfLeafIndex.size();
+            log.info("TaskService--state 项目 " + projectId + " 的叶子指标" + indexId + "下成功执行的场景数量为:" + resultNumberOfCurrentIndex);
+            // 计算叶子指标得分(任务得分总和 / 任务数量)
+            double leafIndexScore = resultNumberOfCurrentIndex == 0 ? 0 : NumberUtil.cut(leafSum / resultNumberOfCurrentIndex, 2);
+            // 创建叶子指标对象
+            leafIndexTemplate.setTempScore(leafIndexScore);
+
+            LeafIndexPO leafIndex = LeafIndexPO.builder()
+                    .id(StringUtil.getRandomUUID())
+                    .pId(projectId)
+                    .target(leafIndexTemplate.getIndexId())
+                    .notStandardSceneNum((int) notStandardSceneNumber)
+                    .score(leafIndexScore)
+                    .indexId(indexId)
+                    .parentId(parentId)
+                    .rootId(rootId)
+                    .weight(weight)
+                    .scoreExplain(scoreExplain.get())
+                    .packageLevel(packageLevel)
+                    .build();
+            leafIndex.setCreateUserId(userId);
+            leafIndex.setCreateTime(TimeUtil.getNowForMysql());
+            leafIndex.setModifyUserId(userId);
+            leafIndex.setModifyTime(TimeUtil.getNowForMysql());
+            leafIndex.setIsDeleted("0");
+
+            leafIndexList.add(leafIndex);
+        }
+        // 保存叶子指标得分
+        taskIndexManager.batchInsertLeafIndex(leafIndexList);
+        // 保存一级指标分数
+        log.info("TaskManager--score 项目 " + projectId + " 的所有任务分数为:" + taskList);
+        computeFirst(leafIndexList, allIndexTemplateList, projectId, maxLevel);
+
+        // 调用 server 的接口,计算评价等级
+        evaluationLevel(projectId);
+        log.info("TaskManager--score 项目 " + projectId + " 打分完成!");
+        manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());   // 修改该 project 的状态为已完成
+        log.info("TaskManager--score 项目 " + projectId + " 执行完成!");
+
+    }
+
+    public void computeFirst(List<LeafIndexPO> leafIndexList, List<IndexTemplatePO> allIndexTemplateList, String projectId, int maxLevel) {
+
+        log.info("------- /state computeFirst 计算父指标得分:" + leafIndexList);
+        Iterator<LeafIndexPO> leafTaskIndexIterator = leafIndexList.iterator();
+        // 把 1 级的指标得分直接保存
+        while (leafTaskIndexIterator.hasNext()) {
+            LeafIndexPO leafTaskIndex = leafTaskIndexIterator.next();
+            if (leafTaskIndex.getPackageLevel() == 1) {
+                leafTaskIndex.setCreateUserId(leafTaskIndex.getCreateUserId());
+                leafTaskIndex.setCreateTime(TimeUtil.getNowForMysql());
+                leafTaskIndex.setModifyUserId(leafTaskIndex.getModifyUserId());
+                leafTaskIndex.setModifyTime(TimeUtil.getNowForMysql());
+                leafTaskIndex.setIsDeleted("0");
+                indexMapper.insertFirstIndex(leafTaskIndex);
+                leafTaskIndexIterator.remove();
+            }
+        }
+        if (leafIndexList.size() > 0) {
+            List<LeafIndexPO> nextLevelIndexList = new ArrayList<>();
+            // 找出等级和 maxLevel 不相同的指标暂时不计算
+            leafIndexList.stream()
+                    .filter(po -> maxLevel != po.getPackageLevel())
+                    .forEach(nextLevelIndexList::add);
+            // 找出等级和 maxLevel 相同的指标并根据父指标分组
+            Map<String, List<LeafIndexPO>> sonTaskIndexMap = leafIndexList.stream()
+                    .filter(po -> maxLevel == po.getPackageLevel())
+                    .collect(Collectors.groupingBy(LeafIndexPO::getParentId));
+            Set<String> parentIdSet = sonTaskIndexMap.keySet();
+            List<String> parentIdList = CollectionUtil.setToList(parentIdSet);
+
+            List<IndexTemplatePO> parentIndexTemplateList = allIndexTemplateList.stream()
+                    .filter(indexTemplate -> parentIdList.contains(indexTemplate.getIndexId()))
+                    .collect(Collectors.toList());
+            // 计算父指标得分
+            parentIndexTemplateList.forEach(indexTemplate -> {
+                String weight = indexTemplate.getWeight();
+                List<LeafIndexPO> sonTaskIndexList = sonTaskIndexMap.get(indexTemplate.getIndexId());
+                double parentScore = sonTaskIndexList.stream().mapToDouble(taskIndex -> taskIndex.getScore() * Double.parseDouble(taskIndex.getWeight()) / 100).sum();
+                LeafIndexPO parentTaskIndex = LeafIndexPO.builder()
+                        .id(StringUtil.getRandomUUID())
+                        .pId(projectId)
+                        .target(indexTemplate.getIndexId())
+                        .score(parentScore)
+                        .indexId(indexTemplate.getIndexId())
+                        .parentId(indexTemplate.getParentId())
+                        .rootId(indexTemplate.getRootId())
+                        .weight(weight)
+                        .packageLevel(maxLevel - 1)
+                        .build();
+                nextLevelIndexList.add(parentTaskIndex);
+            });
+            // 将父指标作为叶子指标递归
+            computeFirst(nextLevelIndexList, allIndexTemplateList, projectId, maxLevel - 1);
+        }
+    }
+
+    @SneakyThrows
+    public void evaluationLevel(String projectId) {
+        String tokenUrl = tokenUri + "?grant_type=client_credentials"
+                + "&client_id=" + clientId
+                + "&client_secret=" + clientSecret;
+        log.info("TaskService--state 获取仿真云平台 token:" + tokenUrl);
+        String response = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
+        ObjectMapper objectMapper = new ObjectMapper();
+        JsonNode jsonNode = objectMapper.readTree(response);
+        String accessToken = jsonNode.path("access_token").asText();
+        log.info("TaskService--state 仿真云平台 token 为:" + accessToken);
+        Map<String, String> headers = new HashMap<>();
+        headers.put("Authorization", "Bearer " + accessToken);
+        Map<String, String> params = new HashMap<>();
+        params.put("id", projectId);
+        String post = HttpUtil.post(closeableHttpClient, requestConfig, evaluationLevelUri, headers, params);
+        log.info("TaskService--state 访问仿真云平台评价等级接口:" + evaluationLevelUri + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + post);
+    }
+
+
+
+
+
+    public Boolean taskConfirm(String taskId) {
+        // 查询 task 如果不是 pending 则不执行
+        String state = taskMapper.selectStateById(taskId);
+        return DictConstants.TASK_PENDING.equals(state);
+    }
+
+    public void taskTick(String taskId) {
+        log.info("TaskService--taskTick 任务 " + taskId + "心跳!");
+        TaskPO taskPO = taskMapper.selectById(taskId);
+        String projectId = taskPO.getPId();
+        String userId = taskPO.getCreateUserId();
+        // 刷新 redis 心跳时间
+        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":tick", TimeUtil.getNowString());
+    }
+
+
+}

+ 4 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/UserMapper.java

@@ -12,11 +12,13 @@ public interface UserMapper {
     @Results(id = "user", value = {
             @Result(column = "id", property = "id", jdbcType = JdbcType.VARCHAR),
             @Result(column = "role_code", property = "roleCode", jdbcType = JdbcType.VARCHAR),
-            @Result(column = "use_type", property = "useType", jdbcType = JdbcType.VARCHAR)
+            @Result(column = "use_type", property = "useType", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "create_user_id", property = "createUserId", jdbcType = JdbcType.VARCHAR)
     })
     @Select("select id,\n" +
             "       role_code,\n" +
-            "       use_type\n" +
+            "       use_type,\n" +
+            "       create_user_id\n" +
             "from system_user\n" +
             "where id = #{userId}")
     UserPO selectById(@Param("userId")String userId);

+ 1 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/po/UserPO.java

@@ -11,4 +11,5 @@ public class UserPO {
     private String id; // id
     private String roleCode; // 角色
     private String useType; // 占用类型
+    private String createUserId; // 占用类型
 }

+ 42 - 31
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -1,11 +1,15 @@
 package com.css.simulation.resource.scheduler.scheduler;
 
 import api.common.pojo.constants.DictConstants;
+import api.common.util.CollectionUtil;
 import api.common.util.SshUtil;
 import api.common.util.StringUtil;
 import api.common.util.TimeUtil;
+import com.css.simulation.resource.scheduler.consumer.ManualProjectConsumer;
+import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
 import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+import com.css.simulation.resource.scheduler.pojo.po.ClusterPO;
 import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.service.TaskService;
@@ -22,6 +26,7 @@ import org.springframework.stereotype.Component;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 
 @Component
 @Slf4j
@@ -36,6 +41,8 @@ public class ProjectScheduler {
     @Autowired
     TaskMapper taskMapper;
     @Autowired
+    ClusterMapper clusterMapper;
+    @Autowired
     ManualProjectMapper manualProjectMapper;
     @Value("${scheduler.manual-project.job-yaml}")
     String jobYaml;
@@ -49,39 +56,43 @@ public class ProjectScheduler {
     ApiClient apiClient;
     @Autowired
     KafkaTemplate<String, String> kafkaTemplate;
+    @Autowired
+    ManualProjectConsumer manualProjectConsumer;
 
 
-//    /**
-//     * 调度项目启动
-//     * @throws IOException 超时时间
-//     */
-//    @Scheduled(fixedDelay = 60 * 1000)
-//    public void dispatchProject() throws IOException {
-//        //1 等待执行的项目
-//
-//
-//
-//        long timeout = 2 * 60 * 1000L;
-//
-//        SshClient client = SshUtil.getClient();
-//        ClientSession session = SshUtil.getSession(client, hostname, username, password);
-//        List<TaskPO> executingTaskList = taskMapper.selectExecuting();
-//
-//        log.info("ProjectScheduler--taskTimeout 正在运行的任务有:" + executingTaskList);
-//        if (executingTaskList != null && executingTaskList.size() > 0) {
-//            for (TaskPO task : executingTaskList) {
-//                String taskId = task.getId();
-//                String projectId = task.getPId();
-//                long lastTickTime = Long.parseLong(Objects.requireNonNull(redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick")));
-//                if (TimeUtil.getNow() - lastTickTime > timeout) {
-//                    String podName = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod");
-//                    taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
-//                }
-//            }
-//        }
-//        session.close();
-//        client.stop();
-//    }
+    /**
+     * 调度项目启动
+     *
+     * @throws IOException 超时时间
+     */
+    @Scheduled(fixedDelay = 60 * 1000)
+    public void dispatchProject() throws IOException {
+        //1 查询等待执行的项目
+        List<ProjectPO> projectList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_WAITING);
+        //2
+        if (CollectionUtil.isEmpty(projectList)) {
+            return;
+        }
+        projectList.forEach(project -> {
+            String projectId = project.getId();
+            String userId = project.getCreateUserId();
+            ClusterPO clusterPO = clusterMapper.selectByUserId(userId);
+            String clusterId = clusterPO.getId();
+            int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
+            // 获取该用户正在运行的项目数量
+            Set<String> runningProjectSet = redisTemplate.keys(manualProjectTopic + ":cluster:" + clusterId + ":running" + "*");
+            int runningProjectNumber = CollectionUtil.isEmpty(runningProjectSet) ? 0 : runningProjectSet.size();
+            if (runningProjectNumber < simulationLicenseNumber) {
+                String projectJson = redisTemplate.opsForValue().get(manualProjectTopic + ":cluster:" + clusterId + ":waiting" + projectId);
+                assert projectJson != null;
+                redisTemplate.delete(manualProjectTopic + ":cluster:" + clusterId + ":waiting" + projectId);
+                redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":running" + projectId, projectJson);
+                manualProjectConsumer.parseManualProject(projectJson);
+            }
+        });
+
+
+    }
 
 
     /**

+ 19 - 371
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -1,68 +1,33 @@
 package com.css.simulation.resource.scheduler.service;
 
-import api.common.pojo.constants.DictConstants;
-import api.common.util.*;
-import com.css.simulation.resource.scheduler.manager.TaskIndexManager;
-import com.css.simulation.resource.scheduler.mapper.IndexMapper;
+import api.common.util.SshUtil;
+import com.css.simulation.resource.scheduler.manager.TaskManager;
 import com.css.simulation.resource.scheduler.mapper.IndexTemplateMapper;
-import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-import com.css.simulation.resource.scheduler.pojo.po.IndexTemplatePO;
-import com.css.simulation.resource.scheduler.pojo.po.LeafIndexPO;
-import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
-import com.css.simulation.resource.scheduler.pojo.to.ScoreTO;
-import com.css.simulation.resource.scheduler.util.MinioUtil;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
-import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Service;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
 @Service
 @Slf4j
 public class TaskService {
 
-    @Autowired
-    CloseableHttpClient closeableHttpClient;
-    @Autowired
-    RequestConfig requestConfig;
-    @Autowired
-    MinioClient minioClient;
-    @Value("${minio.bucket-name}")
-    String bucketName;
+
     @Autowired
     StringRedisTemplate stringRedisTemplate;
     @Autowired
-    ManualProjectMapper manualProjectMapper;
-    //    @Autowired
-//    TaskManager taskManager;
-    @Autowired
-    TaskMapper taskMapper;
-    @Autowired
-    TaskIndexManager taskIndexManager;
-    @Autowired
-    IndexMapper indexMapper;
+    TaskManager taskManager;
     @Autowired
     IndexTemplateMapper indexTemplateMapper;
     @Value("${scheduler.manual-project.topic}")
     String manualProjectTopic;
-    @Value("${scheduler.manual-project.result-path-minio}")
-    String resultPathMinio;
     @Value("${scheduler.score.hostname}")
     String hostname;
     @Value("${scheduler.score.username}")
@@ -71,239 +36,39 @@ public class TaskService {
     String password;
     @Value("${spring.kafka.delete-command}")
     String kafkaDeleteCommand;
-    @Value("${scheduler.score.py-path}")
-    String pyPath;
-    @Value("${scheduler.linux-temp-path}")
-    String linuxTempPath;
-    @Value("${simulation-cloud.client-id}")
-    String clientId;
-    @Value("${simulation-cloud.client-secret}")
-    String clientSecret;
-    @Value("${simulation-cloud.token-uri}")
-    String tokenUri;
-    @Value("${simulation-cloud.evaluation-level-uri}")
-    String evaluationLevelUri;
     @Autowired
-    KafkaTemplate<String, String> kafkaTemplate;
+    MinioClient minioClient;
+    @Value("${minio.bucket-name}")
+    String bucketName;
+    @Autowired
+    TaskMapper taskMapper;
 
 
     @SneakyThrows
     public void taskState(String taskId, String state, String podName) {
         log.info("TaskService--state 接收到参数为:taskId=" + taskId + ",state=" + state + ",podName=" + podName);
-
         TaskPO taskPO = taskMapper.selectById(taskId);
         if (taskPO == null) {
-            log.error("TaskService--state 接收到已删除但还在执行的任务:taskId=" + taskId + ",state=" + state + ",podName=" + podName);
+            log.error("TaskManager--isProjectCompleted 接收到已删除但还在执行的任务:taskId=" + taskId + ",state=" + state + ",podName=" + podName);
             return;
         }
         String projectId = taskPO.getPId();
         String userId = taskPO.getCreateUserId();
         SshClient client = SshUtil.getClient();
         ClientSession session = SshUtil.getSession(client, hostname, username, password);
-        if ("Running".equals(state)) {
-            // 将运行中的任务的 pod 名称放入 redis
-            stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":pod", podName);
-            taskTick(taskId); // 刷新一下心跳
-            log.info("TaskService--state 修改任务 " + taskId + "的状态为 Running,pod 名称为:" + podName);
-            taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
-            return;
-        } else {
-            String podDeleteCommand = "kubectl delete pod " + podName;
-            log.info("TaskService--state 修改任务 " + taskId + "的状态为:" + state + ",pod 名称为:" + podName + ",并执行删除 pod 命令:" + podDeleteCommand);
-            if ("Aborted".equals(state)) {
-                if (retry(userId, projectId, taskId)) {
-                    taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
-                    return;
-                }
-                //result-path-minio: /project/manual-project/
-                String minioPathOfErrorLog = resultPathMinio + projectId + "/" + taskId + "error.log";
-                boolean objectExist = MinioUtil.isObjectExist(minioClient, bucketName, minioPathOfErrorLog);
-                String targetEvaluate;
-                if (objectExist) {
-                    String errorString = MinioUtil.downloadToString(minioClient, bucketName, minioPathOfErrorLog);
-                    String[] lines = errorString.split("\n");
-                    StringBuilder errorMessage = new StringBuilder();
-                    for (String line : lines) {
-                        if (line.startsWith("Original Error")) {
-                            errorMessage.append(line).append("\n");
-                        }
-                        if (line.startsWith("Possible Cause")) {
-                            errorMessage.append(line);
-                            break;
-                        }
-                    }
-                    targetEvaluate = errorMessage.toString();
-                } else {
-                    targetEvaluate = DictConstants.TASK_ERROR_REASON_2;
-                }
-                taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), targetEvaluate);
-            } else if ("Terminated".equals(state)) {
-                if (retry(userId, projectId, taskId)) {
-                    taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
-                    return;
-                }
-                taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_3);
-            } else if ("PendingAnalysis".equals(state)) {
-                taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
-            } else {
-                if (retry(userId, projectId, taskId)) {
-                    taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
-                    return;
-                }
-                taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_4);
-            }
-            SshUtil.execute(session, podDeleteCommand);
-        }
-        int taskNum = taskMapper.selectTaskNumByProjectId(projectId);
-        int endTaskNum = taskMapper.selectEndTaskNumByProjectId(projectId);    // 查询已结束的任务 'Aborted', 'PendingAnalysis', 'Terminated'
-        manualProjectMapper.updateTaskCompleted(projectId, endTaskNum);
-        log.info("TaskService--state 项目 " + projectId + " 完成进度为:" + endTaskNum + "/" + taskNum);
-        if (taskNum != endTaskNum) {  // 已结束任务数等于所有任务数量,才会准备打分;否则退出。
+        //1 判断项目是否已完成
+        boolean projectCompleted = taskManager.isProjectCompleted(userId, projectId, taskId, state, podName, session);
+        if (!projectCompleted) {
             session.close();
             client.stop();
             return;
         }
 
-        // -------------------------------- 打分 --------------------------------
-        log.info("TaskService--state 项目 " + projectId + "准备打分!");
-        ProjectPO projectPO = manualProjectMapper.selectById(projectId);
-        String packageId = projectPO.getScenePackageId();  // 场景测试包 id,指标根 id
-        List<TaskPO> taskList = taskMapper.selectTaskListByProjectId(projectId);  // 所有任务信息
-        indexMapper.deleteFirstByProjectId(projectId);
-        indexMapper.deleteLastByProjectId(projectId);
-        //1 查询场景包对应指标
-        String allIndexTemplateListJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":package:" + packageId + ":all");
-        List<IndexTemplatePO> allIndexTemplateList = JsonUtil.jsonToList(allIndexTemplateListJson, IndexTemplatePO.class);
-        String leafIndexTemplateListJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":package:" + packageId + ":leaf");
-        List<IndexTemplatePO> leafIndexTemplateList = JsonUtil.jsonToList(leafIndexTemplateListJson, IndexTemplatePO.class);
-        log.info("TaskService--state 共有 " + leafIndexTemplateList.size() + "个叶子节点:" + leafIndexTemplateListJson);
-        int maxLevel = 1; // 用于计算指标得分
-        List<LeafIndexPO> leafIndexList = new ArrayList<>();
-        for (int i = 0; i < leafIndexTemplateList.size(); i++) {
-            AtomicReference<String> scoreExplain = new AtomicReference<>(); // 每个叶子指标下的任务的得分说明一样和叶子指标一致
-            IndexTemplatePO leafIndexTemplate = leafIndexTemplateList.get(i);
-            String indexId = leafIndexTemplate.getIndexId();
-            String parentId = leafIndexTemplate.getParentId(); // 父 id
-            String rootId = leafIndexTemplate.getRootId(); // 包 id
-            String weight = leafIndexTemplate.getWeight(); // 权重
-            Integer packageLevel = leafIndexTemplate.getPackageLevel(); // 几级指标
-            String ruleName = leafIndexTemplate.getRuleName();    // 打分脚本名称,例如 AEB_1-1
-            String ruleDetails = leafIndexTemplate.getRuleDetails();    // 打分脚本内容
-            if (packageLevel > maxLevel) {
-                maxLevel = packageLevel;
-            }
-            log.info("TaskService--state 开始执行对第 " + (i + 1) + " 个叶子节点 " + indexId + " 进行打分!");
-
-            String ruleFilePath = pyPath + "scripts/" + ruleName.split("_")[0] + "/" + ruleName + ".py";
-            log.info("TaskService--state 将叶子节点 " + indexId + " 对应的打分规则保存到临时目录:" + ruleFilePath);
-            FileUtil.writeInputStreamToLocalFile(IoUtil.stringToInputStream(ruleDetails), ruleFilePath);
-            List<TaskPO> taskListOfLeafIndex = taskList.stream()
-                    .filter(task -> indexId.equals(task.getLastTargetId()))
-                    .collect(Collectors.toList());
-            log.info("TaskService--state 叶子节点 " + indexId + " 包括 " + taskListOfLeafIndex.size() + " 个成功运行结束任务!");
-            log.info("TaskService--state 计算叶子节点 " + indexId + " 的得分:" + taskListOfLeafIndex);
-            taskListOfLeafIndex.forEach(taskOfLeaf -> {
-                String runState = taskOfLeaf.getRunState();
-                if (DictConstants.TASK_ANALYSIS.equals(runState)) {
-                    String task2Id = taskOfLeaf.getId();
-                    taskMapper.updateSuccessStateWithStopTime(task2Id, DictConstants.TASK_ANALYSING, TimeUtil.getNowForMysql());
-                    // 计算每个任务的得分
-                    String runResultMinio = taskOfLeaf.getRunResultFilePath() + "/Ego.csv";
-                    String runResultLinux = linuxTempPath + runResultMinio;
-                    String scoreCommand = "python3 " + pyPath + "main.py " + runResultLinux + " " + taskOfLeaf.getSceneType() + " " + ruleName; // 指定打分脚本
-                    String scoreResult;
-                    ScoreTO score;
-                    try {
-                        try {
-                            log.info("TaskService--state 下载 minio 上的结果文件 " + runResultMinio + " 到本地:" + runResultLinux);
-                            MinioUtil.downloadToFile(minioClient, bucketName, runResultMinio, runResultLinux);  // 也可改成下载到指定ip的服务器上,需要保证和打分脚本在一台机器上。
-                        } catch (Exception e) {
-                            throw new RuntimeException("------- TaskService--state 下载 minio 上的结果文件出错:" + e.getMessage());
-                        }
-                        try {
-                            log.info("TaskService--state 开始执行打分命令:" + scoreCommand);
-//                            scoreResult = SshUtil.execute(sessionScore, command);
-                            scoreResult = SshUtil.execute(session, scoreCommand);
-                            log.info("TaskService--state 项目" + projectId + "的任务" + task2Id + "打分结束,结果为:" + scoreResult);
-                            String replace = StringUtil.replace(scoreResult, "'", "\"");
-                            score = JsonUtil.jsonToBean(replace, ScoreTO.class);
-                        } catch (IOException e) {
-                            throw new RuntimeException("------- TaskService--state 项目" + projectId + "的任务" + task2Id + " 打分出错,命令为:" + scoreCommand + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
-                        }
-                    } catch (Exception e) {
-                        taskOfLeaf.setRunState(DictConstants.TASK_ABORTED);
-                        taskMapper.updateFailStateWithStopTime(task2Id, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_5);
-                        log.error(e.getMessage());
-                        return; // 如果打分失败则开始下一个打分
-                    }
-                    assert score != null;
-                    taskOfLeaf.setReturnSceneId(score.getUnit_scene_ID());
-                    taskOfLeaf.setScore(score.getUnit_scene_score());
-                    taskOfLeaf.setTargetEvaluate(score.getEvaluate_item());
-                    taskOfLeaf.setScoreExplain(score.getScore_description());
-                    taskOfLeaf.setModifyUserId(userId);
-                    taskOfLeaf.setModifyTime(TimeUtil.getNowForMysql());
-                    scoreExplain.set(score.getScore_description());
-
-                    taskOfLeaf.setRunState(DictConstants.TASK_COMPLETED);
-                    taskMapper.updateSuccessStateAndScoreResultWithStopTime(
-                            taskOfLeaf,
-                            DictConstants.TASK_COMPLETED,
-                            TimeUtil.getNowForMysql()
-                    );
-                }
-            });
-
+        //2 准备打分
+        taskManager.prepareScore(userId, projectId);
 
-            // 计算不合格的任务数(不到100分就是不合格,执行失败的不算)
-            long notStandardSceneNumber = taskListOfLeafIndex.stream()
-                    .filter(task -> task.getScore() < 100)
-                    .count();
-
-            // 计算叶子指标下任务得分总和
-            double leafSum = taskListOfLeafIndex.stream()
-                    .mapToDouble(TaskPO::getScore)
-                    .sum();
-            // 计算任务的个数
-            long resultNumberOfCurrentIndex = taskListOfLeafIndex.size();
-            log.info("TaskService--state 项目 " + projectId + " 的叶子指标" + indexId + "下成功执行的场景数量为:" + resultNumberOfCurrentIndex);
-            // 计算叶子指标得分(任务得分总和 / 任务数量)
-            double leafIndexScore = resultNumberOfCurrentIndex == 0 ? 0 : NumberUtil.cut(leafSum / resultNumberOfCurrentIndex, 2);
-            // 创建叶子指标对象
-            leafIndexTemplate.setTempScore(leafIndexScore);
-
-            LeafIndexPO leafIndex = LeafIndexPO.builder()
-                    .id(StringUtil.getRandomUUID())
-                    .pId(projectId)
-                    .target(leafIndexTemplate.getIndexId())
-                    .notStandardSceneNum((int) notStandardSceneNumber)
-                    .score(leafIndexScore)
-                    .indexId(indexId)
-                    .parentId(parentId)
-                    .rootId(rootId)
-                    .weight(weight)
-                    .scoreExplain(scoreExplain.get())
-                    .packageLevel(packageLevel)
-                    .build();
-            leafIndex.setCreateUserId(userId);
-            leafIndex.setCreateTime(TimeUtil.getNowForMysql());
-            leafIndex.setModifyUserId(userId);
-            leafIndex.setModifyTime(TimeUtil.getNowForMysql());
-            leafIndex.setIsDeleted("0");
-
-            leafIndexList.add(leafIndex);
-        }
-        // 保存叶子指标得分
-        taskIndexManager.batchInsertLeafIndex(leafIndexList);
-        // 保存一级指标分数
-        log.info("TaskService--state 项目 " + projectId + " 的所有任务分数为:" + taskList);
-        computeFirst(leafIndexList, allIndexTemplateList, projectId, maxLevel);
-
-        // 调用 server 的接口,计算评价等级
-        evaluationLevel(projectId);
-        log.info("TaskService--state 项目 " + projectId + " 打分完成!");
-        manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());   // 修改该 project 的状态为已完成
-        log.info("TaskService--state 项目 " + projectId + " 执行完成!");
+        //3 打分
+        taskManager.score(userId, projectId,session);
 
 
         // -------------------------------- 收尾 --------------------------------
@@ -330,129 +95,12 @@ public class TaskService {
 
     }
 
-
-    public boolean retry(String userId, String projectId, String taskId) {
-        log.info("TaskService--retry 重试操作收到的参数为:userId=" + userId + ",projectId=" + projectId + ",taskId=" + taskId);
-        //1 首先查看任务是否重试过 3 次
-        String retryString = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":retry");
-        int retry = Integer.parseInt(Objects.requireNonNull(retryString));
-        //2 如果重试次数没有超过 3 次,则重试
-        if (retry > 3) {
-            return false;
-        }
-        String taskJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":message");
-        retry++;
-        log.info("TaskService--retry 重试项目 " + projectId + " 的任务 " + taskId + ",重试次数为:" + retry + ",重新发送的消息为:" + taskJson);
-        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":retry", retry + "");
-        kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
-            // 消息发送到的topic
-            assert success != null;
-            String topic = success.getRecordMetadata().topic();
-            // 消息发送到的分区
-            int partition = success.getRecordMetadata().partition();
-            // 消息在分区内的offset
-            long offset = success.getRecordMetadata().offset();
-            log.info("------- ManualProjectConsumer 发送消息成功:\n"
-                    + "主题 topic 为:" + topic + "\n"
-                    + "分区 partition 为:" + partition + "\n"
-                    + "偏移量为:" + offset + "\n"
-                    + "消息体为:" + taskJson);
-        }, failure -> {
-            log.error("------- 发送消息失败:" + failure.getMessage());
-        });
-        return true;
-    }
-
-
-    public void computeFirst(List<LeafIndexPO> leafIndexList, List<IndexTemplatePO> allIndexTemplateList, String projectId, int maxLevel) {
-
-        log.info("------- /state computeFirst 计算父指标得分:" + leafIndexList);
-        Iterator<LeafIndexPO> leafTaskIndexIterator = leafIndexList.iterator();
-        // 把 1 级的指标得分直接保存
-        while (leafTaskIndexIterator.hasNext()) {
-            LeafIndexPO leafTaskIndex = leafTaskIndexIterator.next();
-            if (leafTaskIndex.getPackageLevel() == 1) {
-                leafTaskIndex.setCreateUserId(leafTaskIndex.getCreateUserId());
-                leafTaskIndex.setCreateTime(TimeUtil.getNowForMysql());
-                leafTaskIndex.setModifyUserId(leafTaskIndex.getModifyUserId());
-                leafTaskIndex.setModifyTime(TimeUtil.getNowForMysql());
-                leafTaskIndex.setIsDeleted("0");
-                indexMapper.insertFirstIndex(leafTaskIndex);
-                leafTaskIndexIterator.remove();
-            }
-        }
-        if (leafIndexList.size() > 0) {
-            List<LeafIndexPO> nextLevelIndexList = new ArrayList<>();
-            // 找出等级和 maxLevel 不相同的指标暂时不计算
-            leafIndexList.stream()
-                    .filter(po -> maxLevel != po.getPackageLevel())
-                    .forEach(nextLevelIndexList::add);
-            // 找出等级和 maxLevel 相同的指标并根据父指标分组
-            Map<String, List<LeafIndexPO>> sonTaskIndexMap = leafIndexList.stream()
-                    .filter(po -> maxLevel == po.getPackageLevel())
-                    .collect(Collectors.groupingBy(LeafIndexPO::getParentId));
-            Set<String> parentIdSet = sonTaskIndexMap.keySet();
-            List<String> parentIdList = CollectionUtil.setToList(parentIdSet);
-
-            List<IndexTemplatePO> parentIndexTemplateList = allIndexTemplateList.stream()
-                    .filter(indexTemplate -> parentIdList.contains(indexTemplate.getIndexId()))
-                    .collect(Collectors.toList());
-            // 计算父指标得分
-            parentIndexTemplateList.forEach(indexTemplate -> {
-                String weight = indexTemplate.getWeight();
-                List<LeafIndexPO> sonTaskIndexList = sonTaskIndexMap.get(indexTemplate.getIndexId());
-                double parentScore = sonTaskIndexList.stream().mapToDouble(taskIndex -> taskIndex.getScore() * Double.parseDouble(taskIndex.getWeight()) / 100).sum();
-                LeafIndexPO parentTaskIndex = LeafIndexPO.builder()
-                        .id(StringUtil.getRandomUUID())
-                        .pId(projectId)
-                        .target(indexTemplate.getIndexId())
-                        .score(parentScore)
-                        .indexId(indexTemplate.getIndexId())
-                        .parentId(indexTemplate.getParentId())
-                        .rootId(indexTemplate.getRootId())
-                        .weight(weight)
-                        .packageLevel(maxLevel - 1)
-                        .build();
-                nextLevelIndexList.add(parentTaskIndex);
-            });
-            // 将父指标作为叶子指标递归
-            computeFirst(nextLevelIndexList, allIndexTemplateList, projectId, maxLevel - 1);
-        }
-    }
-
-    @SneakyThrows
-    public void evaluationLevel(String projectId) {
-        String tokenUrl = tokenUri + "?grant_type=client_credentials"
-                + "&client_id=" + clientId
-                + "&client_secret=" + clientSecret;
-        log.info("TaskService--state 获取仿真云平台 token:" + tokenUrl);
-        String response = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
-        ObjectMapper objectMapper = new ObjectMapper();
-        JsonNode jsonNode = objectMapper.readTree(response);
-        String accessToken = jsonNode.path("access_token").asText();
-        log.info("TaskService--state 仿真云平台 token 为:" + accessToken);
-        Map<String, String> headers = new HashMap<>();
-        headers.put("Authorization", "Bearer " + accessToken);
-        Map<String, String> params = new HashMap<>();
-        params.put("id", projectId);
-        String post = HttpUtil.post(closeableHttpClient, requestConfig, evaluationLevelUri, headers, params);
-        log.info("TaskService--state 访问仿真云平台评价等级接口:" + evaluationLevelUri + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + post);
-    }
-
-
     public Boolean taskConfirm(String taskId) {
-        // 查询 task 如果不是 pending 则不执行
-        String state = taskMapper.selectStateById(taskId);
-        return DictConstants.TASK_PENDING.equals(state);
+        return taskManager.taskConfirm(taskId);
     }
 
     public void taskTick(String taskId) {
-        log.info("TaskService--taskTick 任务 " + taskId + "心跳!");
-        TaskPO taskPO = taskMapper.selectById(taskId);
-        String projectId = taskPO.getPId();
-        String userId = taskPO.getCreateUserId();
-        // 刷新 redis 心跳时间
-        stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + userId + ":" + projectId + ":task:" + taskId + ":tick", TimeUtil.getNowString());
+        taskManager.taskTick(taskId);
     }