|
@@ -4,13 +4,13 @@ import api.common.pojo.constants.DictConstants;
|
|
|
import api.common.util.*;
|
|
|
import com.css.simulation.resource.scheduler.manager.TaskIndexManager;
|
|
|
import com.css.simulation.resource.scheduler.manager.TaskManager;
|
|
|
+import com.css.simulation.resource.scheduler.mapper.IndexMapper;
|
|
|
import com.css.simulation.resource.scheduler.mapper.IndexTemplateMapper;
|
|
|
import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
|
|
|
-import com.css.simulation.resource.scheduler.mapper.TaskIndexMapper;
|
|
|
import com.css.simulation.resource.scheduler.mapper.TaskMapper;
|
|
|
import com.css.simulation.resource.scheduler.pojo.po.IndexTemplatePO;
|
|
|
+import com.css.simulation.resource.scheduler.pojo.po.LeafIndexPO;
|
|
|
import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
|
|
|
-import com.css.simulation.resource.scheduler.pojo.po.TaskIndexPO;
|
|
|
import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
|
|
|
import com.css.simulation.resource.scheduler.pojo.to.ScoreTO;
|
|
|
import com.css.simulation.resource.scheduler.util.MinioUtil;
|
|
@@ -26,6 +26,7 @@ import org.apache.sshd.client.session.ClientSession;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.io.IOException;
|
|
@@ -49,7 +50,7 @@ public class TaskService {
|
|
|
@Value("${minio.bucket-name}")
|
|
|
String bucketName;
|
|
|
@Autowired
|
|
|
- StringRedisTemplate redisTemplate;
|
|
|
+ StringRedisTemplate stringRedisTemplate;
|
|
|
@Autowired
|
|
|
ManualProjectMapper manualProjectMapper;
|
|
|
// @Autowired
|
|
@@ -61,7 +62,7 @@ public class TaskService {
|
|
|
@Autowired
|
|
|
TaskManager taskManager;
|
|
|
@Autowired
|
|
|
- TaskIndexMapper taskIndexMapper;
|
|
|
+ IndexMapper indexMapper;
|
|
|
@Autowired
|
|
|
IndexTemplateMapper indexTemplateMapper;
|
|
|
@Value("${scheduler.manual-project.topic}")
|
|
@@ -69,18 +70,11 @@ public class TaskService {
|
|
|
@Value("${scheduler.manual-project.result-path-minio}")
|
|
|
String resultPathMinio;
|
|
|
@Value("${scheduler.score.hostname}")
|
|
|
- String hostnameScore;
|
|
|
+ String hostname;
|
|
|
@Value("${scheduler.score.username}")
|
|
|
- String usernameScore;
|
|
|
+ String username;
|
|
|
@Value("${scheduler.score.password}")
|
|
|
- String passwordScore;
|
|
|
-
|
|
|
- @Value("${spring.kafka.hostname}")
|
|
|
- String hostnameKafka;
|
|
|
- @Value("${spring.kafka.username}")
|
|
|
- String usernameKafka;
|
|
|
- @Value("${spring.kafka.password}")
|
|
|
- String passwordKafka;
|
|
|
+ String password;
|
|
|
@Value("${spring.kafka.delete-command}")
|
|
|
String kafkaDeleteCommand;
|
|
|
@Value("${scheduler.score.py-path}")
|
|
@@ -95,74 +89,60 @@ public class TaskService {
|
|
|
String tokenUri;
|
|
|
@Value("${simulation-cloud.evaluation-level-uri}")
|
|
|
String evaluationLevelUri;
|
|
|
+ @Autowired
|
|
|
+ KafkaTemplate<String, String> kafkaTemplate;
|
|
|
|
|
|
|
|
|
@SneakyThrows
|
|
|
public void taskState(String taskId, String state, String podName) {
|
|
|
-
|
|
|
String projectId = taskMapper.selectProjectIdById(taskId);
|
|
|
- redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod", podName);
|
|
|
- String podDeleteCommand = "kubectl delete pod " + podName;
|
|
|
SshClient client = SshUtil.getClient();
|
|
|
- ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
|
|
|
+ ClientSession session = SshUtil.getSession(client, hostname, username, password);
|
|
|
if ("Running".equals(state)) {
|
|
|
+ // 将运行中的任务的 pod 名称放入 redis
|
|
|
+ stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":pod", podName);
|
|
|
log.info("TaskService--state 修改任务 " + taskId + "的状态为 Running,pod 名称为:" + podName);
|
|
|
taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
|
|
|
- } else if ("Aborted".equals(state)) {
|
|
|
- log.info("TaskService--state 修改任务 " + taskId + "的状态为 Aborted,pod 名称为:" + podName
|
|
|
- + ",并执行删除 pod 命令:" + podDeleteCommand);
|
|
|
- SshUtil.execute(session, podDeleteCommand);
|
|
|
-// taskManager.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql()); // 如果任务 abort 代表项目失败
|
|
|
- //result-path-minio: /project/manual-project/
|
|
|
- String minioPathOfErrorLog = resultPathMinio + projectId + "/" + taskId + "error.log";
|
|
|
- boolean objectExist = MinioUtil.isObjectExist(minioClient, bucketName, minioPathOfErrorLog);
|
|
|
- String targetEvaluate;
|
|
|
- if (objectExist) {
|
|
|
- String errorString = MinioUtil.downloadToString(minioClient, bucketName, minioPathOfErrorLog);
|
|
|
- String[] lines = errorString.split("\n");
|
|
|
- StringBuilder errorMessage = new StringBuilder();
|
|
|
- for (String line : lines) {
|
|
|
- if (line.startsWith("Original Error")) {
|
|
|
- errorMessage.append(line).append("\n");
|
|
|
- }
|
|
|
-
|
|
|
- if (line.startsWith("Possible Cause")) {
|
|
|
- errorMessage.append(line);
|
|
|
- break;
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ String podDeleteCommand = "kubectl delete pod " + podName;
|
|
|
+ log.info("TaskService--state 修改任务 " + taskId + "的状态为:" + state + ",pod 名称为:" + podName + ",并执行删除 pod 命令:" + podDeleteCommand);
|
|
|
+ if ("Aborted".equals(state)) {
|
|
|
+ if (retry(projectId, taskId)) {
|
|
|
+ taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //result-path-minio: /project/manual-project/
|
|
|
+ String minioPathOfErrorLog = resultPathMinio + projectId + "/" + taskId + "error.log";
|
|
|
+ boolean objectExist = MinioUtil.isObjectExist(minioClient, bucketName, minioPathOfErrorLog);
|
|
|
+ String targetEvaluate;
|
|
|
+ if (objectExist) {
|
|
|
+ String errorString = MinioUtil.downloadToString(minioClient, bucketName, minioPathOfErrorLog);
|
|
|
+ String[] lines = errorString.split("\n");
|
|
|
+ StringBuilder errorMessage = new StringBuilder();
|
|
|
+ for (String line : lines) {
|
|
|
+ if (line.startsWith("Original Error")) {
|
|
|
+ errorMessage.append(line).append("\n");
|
|
|
+ }
|
|
|
+ if (line.startsWith("Possible Cause")) {
|
|
|
+ errorMessage.append(line);
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
+ targetEvaluate = errorMessage.toString();
|
|
|
+ } else {
|
|
|
+ targetEvaluate = DictConstants.TASK_ERROR_REASON_2;
|
|
|
}
|
|
|
- targetEvaluate = errorMessage.toString();
|
|
|
+ taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), targetEvaluate);
|
|
|
+ } else if ("Terminated".equals(state)) {
|
|
|
+ taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_3);
|
|
|
+ } else if ("PendingAnalysis".equals(state)) {
|
|
|
+ taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
|
|
|
} else {
|
|
|
- targetEvaluate = DictConstants.TASK_ERROR_REASON_2;
|
|
|
+ taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_4);
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), targetEvaluate);
|
|
|
- } else if ("Terminated".equals(state)) {
|
|
|
- log.info("TaskService--state 修改任务 " + taskId + "的状态为 Terminated,pod 名称为:" + podName
|
|
|
- + ",并执行删除 pod 命令:" + podDeleteCommand);
|
|
|
SshUtil.execute(session, podDeleteCommand);
|
|
|
- taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_3);
|
|
|
- } else if ("PendingAnalysis".equals(state)) {
|
|
|
- log.info("TaskService--state 修改任务 " + taskId + "的状态为 PendingAnalysis,pod 名称为:" + podName
|
|
|
- + ",并执行删除 pod 命令:" + podDeleteCommand);
|
|
|
- SshUtil.execute(session, podDeleteCommand);
|
|
|
- taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
|
|
|
- } else {
|
|
|
- log.error("TaskService--state 出现了未知状态:" + state);
|
|
|
- taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_4);
|
|
|
}
|
|
|
- ProjectPO projectPO = manualProjectMapper.selectById(taskId);
|
|
|
- if (projectPO == null) {
|
|
|
- session.close();
|
|
|
- client.stop();
|
|
|
- return;
|
|
|
- }
|
|
|
- Set<String> keys = redisTemplate.keys("manualProject:" + projectId + "*");
|
|
|
- assert keys != null;
|
|
|
- redisTemplate.delete(keys);
|
|
|
- String scenePackageId = projectPO.getScenePackageId(); // 场景测试包 id,指标根 id
|
|
|
-// log.info("------- /state 任务 " + taskId + " 的父项目为:" + projectId);
|
|
|
int taskNum = manualProjectMapper.selectTaskNumById(projectId);
|
|
|
int endTaskNum = manualProjectMapper.selectEndTaskNum(projectId); // 查询已结束的任务 'Aborted', 'PendingAnalysis', 'Terminated'
|
|
|
manualProjectMapper.updateTaskCompleted(projectId, endTaskNum);
|
|
@@ -173,65 +153,52 @@ public class TaskService {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- log.info("结束项目的 job");
|
|
|
- SshUtil.execute(session, "kubectl delete job project-" + projectId);
|
|
|
-
|
|
|
- // 删除 kafka topic
|
|
|
-// SshClient clientKafka = SshUtil.getClient();
|
|
|
-// ClientSession sessionKafka = SshUtil.getSession(clientKafka, hostnameKafka, usernameKafka, passwordKafka);
|
|
|
-// String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand, "topicName", projectId);
|
|
|
-// SshUtil.execute(sessionKafka, topicDeleteCommand);
|
|
|
-// SshUtil.stop(clientKafka, sessionKafka);
|
|
|
-
|
|
|
+ // -------------------------------- 打分 --------------------------------
|
|
|
+ log.info("TaskService--state 项目 " + projectId + "准备打分!");
|
|
|
+ ProjectPO projectPO = manualProjectMapper.selectById(projectId);
|
|
|
+ String packageId = projectPO.getScenePackageId(); // 场景测试包 id,指标根 id
|
|
|
List<TaskPO> taskList = taskMapper.selectTaskListByProjectId(projectId); // 所有任务信息
|
|
|
- int taskNumber = taskList.size();
|
|
|
- log.info("TaskService--state 共有 " + taskNumber + "个任务!");
|
|
|
- taskIndexMapper.deleteFirstByProjectId(projectId);
|
|
|
- taskIndexMapper.deleteLastByProjectId(projectId);
|
|
|
- // -------------------------------- 查询叶子指标 --------------------------------
|
|
|
- List<IndexTemplatePO> leafIndexTemplateList = indexTemplateMapper.selectLeafIndexWithRuleDetailsByPackageId(scenePackageId);
|
|
|
+ indexMapper.deleteFirstByProjectId(projectId);
|
|
|
+ indexMapper.deleteLastByProjectId(projectId);
|
|
|
+ //1 查询场景包对应指标
|
|
|
+ List<IndexTemplatePO> allIndexList = JsonUtil.jsonToList(stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + packageId + ":all"), IndexTemplatePO.class);
|
|
|
+ List<IndexTemplatePO> leafIndexList = JsonUtil.jsonToList(stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + packageId + ":leaf"), IndexTemplatePO.class);
|
|
|
int maxLevel = 1; // 用于计算指标得分
|
|
|
- List<TaskIndexPO> leafTaskIndexList = new ArrayList<>();
|
|
|
- log.info("TaskService--state 共有 " + leafIndexTemplateList.size() + "个叶子节点!");
|
|
|
+ List<LeafIndexPO> leafTaskIndexList = new ArrayList<>();
|
|
|
+ log.info("TaskService--state 共有 " + leafIndexList.size() + "个叶子节点!");
|
|
|
|
|
|
-// SshClient clientScore = SshUtil.getClient();
|
|
|
-// ClientSession sessionScore = SshUtil.getSession(clientScore, hostnameScore, usernameScore, passwordScore);
|
|
|
- for (int i = 0; i < leafIndexTemplateList.size(); i++) {
|
|
|
+ for (int i = 0; i < leafIndexList.size(); i++) {
|
|
|
AtomicReference<String> scoreExplain = new AtomicReference<>(); // 每个叶子指标下的任务的得分说明一样和叶子指标一致
|
|
|
- IndexTemplatePO indexTemplatePO = leafIndexTemplateList.get(i);
|
|
|
- String indexId = indexTemplatePO.getIndexId();
|
|
|
- String parentId = indexTemplatePO.getParentId();
|
|
|
- String rootId = indexTemplatePO.getRootId();
|
|
|
- String weight = indexTemplatePO.getWeight();
|
|
|
- Integer packageLevel = indexTemplatePO.getPackageLevel();
|
|
|
+ IndexTemplatePO leafIndex = leafIndexList.get(i);
|
|
|
+ String indexId = leafIndex.getIndexId();
|
|
|
+ String parentId = leafIndex.getParentId(); // 父 id
|
|
|
+ String rootId = leafIndex.getRootId(); // 包 id
|
|
|
+ String weight = leafIndex.getWeight(); // 权重
|
|
|
+ Integer packageLevel = leafIndex.getPackageLevel(); // 几级指标
|
|
|
+ String ruleName = leafIndex.getRuleName(); // 打分脚本名称,例如 AEB_1-1
|
|
|
+ String ruleDetails = leafIndex.getRuleDetails(); // 打分脚本内容
|
|
|
if (packageLevel > maxLevel) {
|
|
|
maxLevel = packageLevel;
|
|
|
}
|
|
|
log.info("TaskService--state 开始执行对第 " + (i + 1) + " 个叶子节点 " + indexId + " 进行打分!");
|
|
|
- String ruleName = indexTemplatePO.getRuleName(); // 打分脚本名称,例如 AEB_1-1
|
|
|
- String ruleDetails = indexTemplatePO.getRuleDetails(); // 打分脚本内容
|
|
|
+
|
|
|
String ruleFilePath = pyPath + "scripts/" + ruleName.split("_")[0] + "/" + ruleName + ".py";
|
|
|
log.info("TaskService--state 将叶子节点 " + indexId + " 对应的打分规则保存到临时目录:" + ruleFilePath);
|
|
|
FileUtil.writeInputStreamToLocalFile(IoUtil.stringToInputStream(ruleDetails), ruleFilePath);
|
|
|
-
|
|
|
List<TaskPO> taskListOfLeafIndex = taskList.stream()
|
|
|
.filter(task -> indexId.equals(task.getLastTargetId()))
|
|
|
.collect(Collectors.toList());
|
|
|
-
|
|
|
log.info("TaskService--state 叶子节点 " + indexId + " 包括 " + taskListOfLeafIndex.size() + " 个成功运行结束任务!");
|
|
|
log.info("TaskService--state 计算叶子节点 " + indexId + " 的得分:" + taskListOfLeafIndex);
|
|
|
- taskListOfLeafIndex.forEach(task2 -> {
|
|
|
- String runState = task2.getRunState();
|
|
|
+ taskListOfLeafIndex.forEach(taskOfLeaf -> {
|
|
|
+ String runState = taskOfLeaf.getRunState();
|
|
|
if (DictConstants.TASK_ANALYSIS.equals(runState)) {
|
|
|
- String task2Id = task2.getId();
|
|
|
+ String task2Id = taskOfLeaf.getId();
|
|
|
taskMapper.updateSuccessStateWithStopTime(task2Id, DictConstants.TASK_ANALYSING, TimeUtil.getNowForMysql());
|
|
|
// 计算每个任务的得分
|
|
|
- String runResultMinio = task2.getRunResultFilePath() + "/Ego.csv";
|
|
|
+ String runResultMinio = taskOfLeaf.getRunResultFilePath() + "/Ego.csv";
|
|
|
String runResultLinux = linuxTempPath + runResultMinio;
|
|
|
-
|
|
|
-// python3 /home/ubuntu/test/Evaluate/main.py /home/ubuntu/test/test_data.csv 4 AEB_1-2
|
|
|
-// String command = "python3 " + pyPath + " " + runResultLinux + " " + task2.getSceneType(); // 默认使用场景名称找打分脚本
|
|
|
- String scoreCommand = "python3 " + pyPath + "main.py " + runResultLinux + " " + task2.getSceneType() + " " + ruleName; // 指定打分脚本
|
|
|
+ String scoreCommand = "python3 " + pyPath + "main.py " + runResultLinux + " " + taskOfLeaf.getSceneType() + " " + ruleName; // 指定打分脚本
|
|
|
String scoreResult;
|
|
|
ScoreTO score;
|
|
|
try {
|
|
@@ -252,24 +219,23 @@ public class TaskService {
|
|
|
throw new RuntimeException("------- TaskService--state 项目" + projectId + "的任务" + task2Id + " 打分出错,命令为:" + scoreCommand + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- task2.setRunState(DictConstants.TASK_ABORTED);
|
|
|
+ taskOfLeaf.setRunState(DictConstants.TASK_ABORTED);
|
|
|
taskMapper.updateFailStateWithStopTime(task2Id, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_5);
|
|
|
log.error(e.getMessage());
|
|
|
return; // 如果打分失败则开始下一个打分
|
|
|
}
|
|
|
assert score != null;
|
|
|
- task2.setReturnSceneId(score.getUnit_scene_ID());
|
|
|
-// task2.setScore(new Random().nextInt(10) * 10.0);
|
|
|
- task2.setScore(score.getUnit_scene_score());
|
|
|
- task2.setTargetEvaluate(score.getEvaluate_item());
|
|
|
- task2.setScoreExplain(score.getScore_description());
|
|
|
- task2.setModifyUserId(USER_ID);
|
|
|
- task2.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
+ taskOfLeaf.setReturnSceneId(score.getUnit_scene_ID());
|
|
|
+ taskOfLeaf.setScore(score.getUnit_scene_score());
|
|
|
+ taskOfLeaf.setTargetEvaluate(score.getEvaluate_item());
|
|
|
+ taskOfLeaf.setScoreExplain(score.getScore_description());
|
|
|
+ taskOfLeaf.setModifyUserId(USER_ID);
|
|
|
+ taskOfLeaf.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
scoreExplain.set(score.getScore_description());
|
|
|
|
|
|
- task2.setRunState(DictConstants.TASK_COMPLETED);
|
|
|
+ taskOfLeaf.setRunState(DictConstants.TASK_COMPLETED);
|
|
|
taskMapper.updateSuccessStateAndScoreResultWithStopTime(
|
|
|
- task2,
|
|
|
+ taskOfLeaf,
|
|
|
DictConstants.TASK_COMPLETED,
|
|
|
TimeUtil.getNowForMysql()
|
|
|
);
|
|
@@ -282,21 +248,22 @@ public class TaskService {
|
|
|
.filter(task -> task.getScore() < 100)
|
|
|
.count();
|
|
|
|
|
|
- // 计算总分
|
|
|
+ // 计算叶子指标下任务得分总和
|
|
|
double leafSum = taskListOfLeafIndex.stream()
|
|
|
.mapToDouble(TaskPO::getScore)
|
|
|
.sum();
|
|
|
// 计算任务的个数
|
|
|
long resultNumberOfCurrentIndex = taskListOfLeafIndex.size();
|
|
|
log.info("TaskService--state 项目 " + projectId + " 的叶子指标" + indexId + "下成功执行的场景数量为:" + resultNumberOfCurrentIndex);
|
|
|
+ // 计算叶子指标得分(任务得分总和 / 任务数量)
|
|
|
double leafIndexScore = resultNumberOfCurrentIndex == 0 ? 0 : NumberUtil.cut(leafSum / resultNumberOfCurrentIndex, 2);
|
|
|
- // -------------------------------- 保存叶子指标得分 --------------------------------
|
|
|
- indexTemplatePO.setTempScore(leafIndexScore);
|
|
|
+ // 创建叶子指标对象
|
|
|
+ leafIndex.setTempScore(leafIndexScore);
|
|
|
|
|
|
- TaskIndexPO leafTaskIndex = TaskIndexPO.builder()
|
|
|
+ LeafIndexPO leafTaskIndex = LeafIndexPO.builder()
|
|
|
.id(StringUtil.getRandomUUID())
|
|
|
.pId(projectId)
|
|
|
- .target(indexTemplatePO.getIndexId())
|
|
|
+ .target(leafIndex.getIndexId())
|
|
|
.notStandardSceneNum((int) notStandardSceneNumber)
|
|
|
.score(leafIndexScore)
|
|
|
.indexId(indexId)
|
|
@@ -311,80 +278,116 @@ public class TaskService {
|
|
|
leafTaskIndex.setModifyUserId(USER_ID);
|
|
|
leafTaskIndex.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
leafTaskIndex.setIsDeleted("0");
|
|
|
+
|
|
|
leafTaskIndexList.add(leafTaskIndex);
|
|
|
}
|
|
|
-// SshUtil.stop(clientScore, sessionScore);
|
|
|
-// // 保存任务分数
|
|
|
-// taskManager.batchUpdateByScoreResult(taskList);
|
|
|
- // 保存末级指标分数
|
|
|
+
|
|
|
+ // 保存叶子指标得分
|
|
|
taskIndexManager.batchInsertLeafIndex(leafTaskIndexList);
|
|
|
// 保存一级指标分数
|
|
|
log.info("TaskService--state 项目 " + projectId + " 的所有任务分数为:" + taskList);
|
|
|
log.info("TaskService--state 根据每个指标的得分和权重算出各个一级指标的得分(即 project 对应的场景测试包下的一级指标)!");
|
|
|
- computeFirst(leafTaskIndexList, projectId, maxLevel);
|
|
|
+ computeFirst(leafTaskIndexList, allIndexList, projectId, maxLevel);
|
|
|
|
|
|
// 调用 server 的接口,计算评价等级
|
|
|
- String tokenUrl = tokenUri + "?grant_type=client_credentials"
|
|
|
- + "&client_id=" + clientId
|
|
|
- + "&client_secret=" + clientSecret;
|
|
|
- log.info("TaskService--state 获取仿真云平台 token:" + tokenUrl);
|
|
|
- String response = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
|
|
|
- ObjectMapper objectMapper = new ObjectMapper();
|
|
|
- JsonNode jsonNode = objectMapper.readTree(response);
|
|
|
- String accessToken = jsonNode.path("access_token").asText();
|
|
|
- log.info("TaskService--state 仿真云平台 token 为:" + accessToken);
|
|
|
- Map<String, String> headers = new HashMap<>();
|
|
|
- headers.put("Authorization", "Bearer " + accessToken);
|
|
|
- Map<String, String> params = new HashMap<>();
|
|
|
- params.put("id", projectId);
|
|
|
- String post = HttpUtil.post(closeableHttpClient, requestConfig, evaluationLevelUri, headers, params);
|
|
|
- log.info("TaskService--state 访问仿真云平台评价等级接口:" + evaluationLevelUri + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + post);
|
|
|
+ evaluationLevel(projectId);
|
|
|
log.info("TaskService--state 项目 " + projectId + " 打分完成!");
|
|
|
manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql()); // 修改该 project 的状态为已完成
|
|
|
log.info("TaskService--state 项目 " + projectId + " 执行完成!");
|
|
|
+
|
|
|
+
|
|
|
+ // -------------------------------- 收尾 --------------------------------
|
|
|
+
|
|
|
+ // 删除所有 key
|
|
|
+// Set<String> keys = redisTemplate.keys("manualProject:" + projectId + "*");
|
|
|
+// assert keys != null;
|
|
|
+// redisTemplate.delete(keys);
|
|
|
+// log.info("------- /state 任务 " + taskId + " 的父项目为:" + projectId);
|
|
|
+
|
|
|
+
|
|
|
+ // 删除 kafka topic
|
|
|
+// SshClient clientKafka = SshUtil.getClient();
|
|
|
+// ClientSession sessionKafka = SshUtil.getSession(clientKafka, hostnameKafka, usernameKafka, passwordKafka);
|
|
|
+// String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand, "topicName", projectId);
|
|
|
+// SshUtil.execute(sessionKafka, topicDeleteCommand);
|
|
|
+// SshUtil.stop(clientKafka, sessionKafka);
|
|
|
+
|
|
|
+
|
|
|
+ // 删除 job
|
|
|
+ SshUtil.execute(session, "kubectl delete job project-" + projectId);
|
|
|
session.close();
|
|
|
client.stop();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
- public void computeFirst(List<TaskIndexPO> leafTaskIndexList, String projectId, int maxLevel) {
|
|
|
+ public boolean retry(String projectId, String taskId) {
|
|
|
+ //1 首先查看任务是否重试过 3 次
|
|
|
+ int retry = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry")));
|
|
|
+ //2 如果重试次数没有超过 3 次,则重试
|
|
|
+ if (retry > 3) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ String taskJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":message");
|
|
|
+ kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
|
|
|
+ // 消息发送到的topic
|
|
|
+ assert success != null;
|
|
|
+ String topic = success.getRecordMetadata().topic();
|
|
|
+ // 消息发送到的分区
|
|
|
+ int partition = success.getRecordMetadata().partition();
|
|
|
+ // 消息在分区内的offset
|
|
|
+ long offset = success.getRecordMetadata().offset();
|
|
|
+ log.info("------- ManualProjectConsumer 发送消息成功:\n"
|
|
|
+ + "主题 topic 为:" + topic + "\n"
|
|
|
+ + "分区 partition 为:" + partition + "\n"
|
|
|
+ + "偏移量为:" + offset + "\n"
|
|
|
+ + "消息体为:" + taskJson);
|
|
|
+ }, failure -> {
|
|
|
+ log.error("------- 发送消息失败:" + failure.getMessage());
|
|
|
+ });
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
- log.info("------- /state computeFirst 计算父指标得分:" + leafTaskIndexList);
|
|
|
- Iterator<TaskIndexPO> leafTaskIndexIterator = leafTaskIndexList.iterator();
|
|
|
+ public void computeFirst(List<LeafIndexPO> leafIndexList, List<IndexTemplatePO> allIndexTemplateList, String projectId, int maxLevel) {
|
|
|
+
|
|
|
+ log.info("------- /state computeFirst 计算父指标得分:" + leafIndexList);
|
|
|
+ Iterator<LeafIndexPO> leafTaskIndexIterator = leafIndexList.iterator();
|
|
|
// 把 1 级的指标得分直接保存
|
|
|
while (leafTaskIndexIterator.hasNext()) {
|
|
|
- TaskIndexPO leafTaskIndex = leafTaskIndexIterator.next();
|
|
|
+ LeafIndexPO leafTaskIndex = leafTaskIndexIterator.next();
|
|
|
if (leafTaskIndex.getPackageLevel() == 1) {
|
|
|
leafTaskIndex.setCreateUserId(USER_ID);
|
|
|
leafTaskIndex.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
leafTaskIndex.setModifyUserId(USER_ID);
|
|
|
leafTaskIndex.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
leafTaskIndex.setIsDeleted("0");
|
|
|
- taskIndexMapper.insertFirstIndex(leafTaskIndex);
|
|
|
+ indexMapper.insertFirstIndex(leafTaskIndex);
|
|
|
leafTaskIndexIterator.remove();
|
|
|
}
|
|
|
}
|
|
|
- if (leafTaskIndexList.size() > 0) {
|
|
|
-
|
|
|
- List<TaskIndexPO> nextLevelIndexList = new ArrayList<>();
|
|
|
+ if (leafIndexList.size() > 0) {
|
|
|
+ List<LeafIndexPO> nextLevelIndexList = new ArrayList<>();
|
|
|
// 找出等级和 maxLevel 不相同的指标暂时不计算
|
|
|
- leafTaskIndexList.stream()
|
|
|
+ leafIndexList.stream()
|
|
|
.filter(po -> maxLevel != po.getPackageLevel())
|
|
|
.forEach(nextLevelIndexList::add);
|
|
|
// 找出等级和 maxLevel 相同的指标并根据父指标分组
|
|
|
- Map<String, List<TaskIndexPO>> sonTaskIndexMap = leafTaskIndexList.stream()
|
|
|
+ Map<String, List<LeafIndexPO>> sonTaskIndexMap = leafIndexList.stream()
|
|
|
.filter(po -> maxLevel == po.getPackageLevel())
|
|
|
- .collect(Collectors.groupingBy(TaskIndexPO::getParentId));
|
|
|
+ .collect(Collectors.groupingBy(LeafIndexPO::getParentId));
|
|
|
Set<String> parentIdSet = sonTaskIndexMap.keySet();
|
|
|
List<String> parentIdList = CollectionUtil.setToList(parentIdSet);
|
|
|
- List<IndexTemplatePO> parentIndexTemplateList = indexTemplateMapper.selectByIdList(parentIdList);
|
|
|
+
|
|
|
+ List<IndexTemplatePO> parentIndexTemplateList = allIndexTemplateList.stream()
|
|
|
+ .filter(indexTemplate -> parentIdList.contains(indexTemplate.getIndexId()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
// 计算父指标得分
|
|
|
parentIndexTemplateList.forEach(indexTemplate -> {
|
|
|
String weight = indexTemplate.getWeight();
|
|
|
- List<TaskIndexPO> sonTaskIndexList = sonTaskIndexMap.get(indexTemplate.getIndexId());
|
|
|
+ List<LeafIndexPO> sonTaskIndexList = sonTaskIndexMap.get(indexTemplate.getIndexId());
|
|
|
double parentScore = sonTaskIndexList.stream().mapToDouble(taskIndex -> taskIndex.getScore() * Double.parseDouble(taskIndex.getWeight()) / 100).sum();
|
|
|
- TaskIndexPO parentTaskIndex = TaskIndexPO.builder()
|
|
|
+ LeafIndexPO parentTaskIndex = LeafIndexPO.builder()
|
|
|
.id(StringUtil.getRandomUUID())
|
|
|
.pId(projectId)
|
|
|
.target(indexTemplate.getIndexId())
|
|
@@ -398,10 +401,29 @@ public class TaskService {
|
|
|
nextLevelIndexList.add(parentTaskIndex);
|
|
|
});
|
|
|
// 将父指标作为叶子指标递归
|
|
|
- computeFirst(nextLevelIndexList, projectId, maxLevel - 1);
|
|
|
+ computeFirst(nextLevelIndexList, allIndexTemplateList, projectId, maxLevel - 1);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @SneakyThrows
|
|
|
+ public void evaluationLevel(String projectId) {
|
|
|
+ String tokenUrl = tokenUri + "?grant_type=client_credentials"
|
|
|
+ + "&client_id=" + clientId
|
|
|
+ + "&client_secret=" + clientSecret;
|
|
|
+ log.info("TaskService--state 获取仿真云平台 token:" + tokenUrl);
|
|
|
+ String response = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
|
|
|
+ ObjectMapper objectMapper = new ObjectMapper();
|
|
|
+ JsonNode jsonNode = objectMapper.readTree(response);
|
|
|
+ String accessToken = jsonNode.path("access_token").asText();
|
|
|
+ log.info("TaskService--state 仿真云平台 token 为:" + accessToken);
|
|
|
+ Map<String, String> headers = new HashMap<>();
|
|
|
+ headers.put("Authorization", "Bearer " + accessToken);
|
|
|
+ Map<String, String> params = new HashMap<>();
|
|
|
+ params.put("id", projectId);
|
|
|
+ String post = HttpUtil.post(closeableHttpClient, requestConfig, evaluationLevelUri, headers, params);
|
|
|
+ log.info("TaskService--state 访问仿真云平台评价等级接口:" + evaluationLevelUri + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + post);
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
public Boolean taskConfirm(String taskId) {
|
|
|
// 查询 task 如果不是 pending 则不执行
|
|
@@ -414,7 +436,7 @@ public class TaskService {
|
|
|
// 刷新 redis 心跳时间
|
|
|
ProjectPO projectPO = manualProjectMapper.selectById(taskId);
|
|
|
String projectId = projectPO.getId();
|
|
|
- redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId, TimeUtil.getNowString());
|
|
|
+ stringRedisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick", TimeUtil.getNowString());
|
|
|
}
|
|
|
|
|
|
|