|
@@ -14,6 +14,7 @@ import com.css.simulation.resource.scheduler.pojo.po.TaskIndexPO;
|
|
|
import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
|
|
|
import com.css.simulation.resource.scheduler.pojo.to.ScoreTO;
|
|
|
import com.css.simulation.resource.scheduler.util.MinioUtil;
|
|
|
+import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import io.minio.MinioClient;
|
|
@@ -77,6 +78,8 @@ public class TaskService {
|
|
|
String usernameKafka;
|
|
|
@Value("${spring.kafka.password}")
|
|
|
String passwordKafka;
|
|
|
+ @Value("${spring.kafka.delete-command}")
|
|
|
+ String kafkaDeleteCommand;
|
|
|
@Value("${scheduler.score.py-path}")
|
|
|
String pyPath;
|
|
|
@Value("${scheduler.linux-temp-path}")
|
|
@@ -122,8 +125,9 @@ public class TaskService {
|
|
|
projectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED); // 修改该 project 的状态为已完成
|
|
|
LinuxUtil.execute("kubectl delete job project-" + projectId);
|
|
|
ClientSession sessionKafka = SshUtil.getSession(hostnameKafka, usernameKafka, passwordKafka);
|
|
|
-// SshUtil.execute(sessionKafka, "/opt/module/kafka_2.13-3.1.0/bin/kafka-topics.sh --bootstrap-server " + hostnameKafka + ":9092 --delete --topic " + projectId);
|
|
|
- SshUtil.execute(sessionKafka, "/opt/module/kafka_2.13-3.1.0/bin/kafka-topics.sh --bootstrap-server " + hostnameKafka + ":9092 --delete --topic test");
|
|
|
+// String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand,"topicName",projectId);
|
|
|
+ String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand, "topicName", "test");
|
|
|
+ SshUtil.execute(sessionKafka, topicDeleteCommand);
|
|
|
List<TaskPO> taskList = taskMapper.selectTaskListByProjectId(projectId); // 所有任务信息
|
|
|
// -------------------------------- 查询叶子指标 --------------------------------
|
|
|
List<IndexTemplatePO> leafIndexTemplateList = indexTemplateMapper.selectLeafIndexWithRuleDetailsByPackageId(scenePackageId);
|
|
@@ -165,24 +169,25 @@ public class TaskService {
|
|
|
String task2Id = task2.getId();
|
|
|
taskMapper.updateState(task2Id, DictConstants.TASK_ANALYSING);
|
|
|
// 计算每个任务的得分
|
|
|
- ScoreTO score;
|
|
|
+
|
|
|
String runResultMinio = task2.getRunResultFilePath() + "/Ego.csv";
|
|
|
String runResultLinux = linuxTempPath + runResultMinio;
|
|
|
|
|
|
// python3 /home/ubuntu/test/Evaluate/main.py /home/ubuntu/test/test_data.csv 4 AEB_1-2
|
|
|
// String command = "python3 " + pyPath + " " + runResultLinux + " " + task2.getSceneType(); // 默认使用场景名称找打分脚本
|
|
|
String command = "python3 " + pyPath + "main.py " + runResultLinux + " " + task2.getSceneType() + " " + ruleName; // 指定打分脚本
|
|
|
+ String scoreResult;
|
|
|
try {
|
|
|
try {
|
|
|
log.info("------- /state 下载 minio 上的结果文件 " + runResultMinio + " 到本地:" + runResultLinux);
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, runResultMinio, runResultLinux);
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, runResultMinio, runResultLinux); // 也可改成下载到指定ip的服务器上,需要保证和打分脚本在一台机器上。
|
|
|
} catch (Exception e) {
|
|
|
throw new RuntimeException("------- /state 下载 minio 上的结果文件出错:" + e.getMessage());
|
|
|
}
|
|
|
try {
|
|
|
log.info("------- /state 开始执行打分命令:" + command);
|
|
|
- score = JsonUtil.jsonToBean(SshUtil.execute(sessionScore, command), ScoreTO.class);
|
|
|
- log.info("------- /state 打分结束,结果为:" + score);
|
|
|
+ scoreResult = SshUtil.execute(sessionScore, command);
|
|
|
+ log.info("------- /state 打分结束,结果为:" + scoreResult);
|
|
|
} catch (IOException e) {
|
|
|
throw new RuntimeException("------- /state 任务 " + task2Id + " 打分出错,命令为:" + command + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
|
|
|
}
|
|
@@ -190,6 +195,14 @@ public class TaskService {
|
|
|
taskMapper.updateState(task2Id, DictConstants.TASK_ABORTED);
|
|
|
throw new RuntimeException(e.getMessage());
|
|
|
}
|
|
|
+ ScoreTO score = null;
|
|
|
+ try {
|
|
|
+ String replace = StringUtil.replace(scoreResult, "'", "\"");
|
|
|
+ score = JsonUtil.jsonToBean(replace, ScoreTO.class);
|
|
|
+ } catch (JsonProcessingException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ assert score != null;
|
|
|
task2.setReturnSceneId(score.getUnit_scene_ID());
|
|
|
task2.setScore(score.getUnit_scene_score());
|
|
|
task2.setTargetEvaluate(score.getEvaluate_item());
|
|
@@ -220,8 +233,9 @@ public class TaskService {
|
|
|
}
|
|
|
SshUtil.stop(sessionScore);
|
|
|
|
|
|
- // 根据每个指标的得分和权重算出 project 的总得分。
|
|
|
+ log.info("------- /state 根据每个指标的得分和权重算出 project 的总得分:" + leafIndexTemplateList);
|
|
|
double totalScore = compute(leafIndexTemplateList);
|
|
|
+ log.info("------- /state 总得分为:" + totalScore);
|
|
|
// 保存分数
|
|
|
// 保存任务分数
|
|
|
taskManager.batchUpdateByScoreResult(taskList);
|
|
@@ -267,8 +281,9 @@ public class TaskService {
|
|
|
public double compute(List<IndexTemplatePO> leaf) {
|
|
|
double result = 0.0;
|
|
|
Map<String, List<IndexTemplatePO>> groups = leaf.stream().collect(Collectors.groupingBy(IndexTemplatePO::getParentId));
|
|
|
- Set<String> idSet = groups.keySet();
|
|
|
- List<IndexTemplatePO> indexTemplatePOList = indexTemplateMapper.selectByIdList(CollectionUtil.setToList(idSet));
|
|
|
+ Set<String> parentIdSet = groups.keySet();
|
|
|
+ log.info("------- /state 将叶子指标按父指标分组父指标集合为:" + parentIdSet);
|
|
|
+ List<IndexTemplatePO> indexTemplatePOList = indexTemplateMapper.selectByIdList(CollectionUtil.setToList(parentIdSet));
|
|
|
indexTemplatePOList.forEach(index1 -> {
|
|
|
double sum = groups.get(index1.getIndexId()).stream().mapToDouble(index2 -> index2.getTempScore() * Double.parseDouble(index2.getWeight())).sum();
|
|
|
index1.setTempScore(sum);
|