|
@@ -3,9 +3,6 @@ package com.css.simulation.resource.scheduler.domain.service;
|
|
|
import api.common.pojo.constants.DictConstants;
|
|
|
import api.common.pojo.param.scene.SceneEvaluationComputeParam;
|
|
|
import api.common.pojo.param.scene.SceneImportParam;
|
|
|
-import api.common.pojo.po.scene.SceneComplexityPO;
|
|
|
-import api.common.pojo.po.scene.SceneEvaluationRulePO;
|
|
|
-import api.common.pojo.po.scene.SceneRiskPO;
|
|
|
import api.common.util.*;
|
|
|
import com.alibaba.druid.util.StringUtils;
|
|
|
import com.css.simulation.resource.scheduler.app.repository.TaskIndexRepository;
|
|
@@ -16,6 +13,8 @@ import com.css.simulation.resource.scheduler.infra.db.redis.RedisUtil;
|
|
|
import com.css.simulation.resource.scheduler.infra.entity.*;
|
|
|
import com.css.simulation.resource.scheduler.infra.fs.minio.MinioUtil;
|
|
|
import com.css.simulation.resource.scheduler.infra.mq.kafka.KafkaUtil;
|
|
|
+import com.css.simulation.resource.scheduler.infra.runnable.SceneEvaluationComputeRunnable;
|
|
|
+import com.css.simulation.resource.scheduler.infra.threadpool.ThreadPool;
|
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import io.minio.MinioClient;
|
|
@@ -30,18 +29,14 @@ import org.springframework.data.redis.core.StringRedisTemplate;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
-import java.io.*;
|
|
|
-import java.nio.file.Files;
|
|
|
-import java.nio.file.Path;
|
|
|
-import java.nio.file.Paths;
|
|
|
-import java.nio.file.attribute.PosixFilePermission;
|
|
|
+import java.io.BufferedReader;
|
|
|
+import java.io.File;
|
|
|
+import java.io.InputStreamReader;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
-import static api.common.pojo.enums.SceneEvaluationEnum.matchLevelEnumByLevel;
|
|
|
-
|
|
|
@Component
|
|
|
@Slf4j
|
|
|
public class TaskDomainService {
|
|
@@ -174,13 +169,38 @@ public class TaskDomainService {
|
|
|
// 使用 stream 流会出现无法进入循环的情况
|
|
|
for (TaskEntity taskOfLeaf : taskListOfLeafIndex) {
|
|
|
String task2Id = taskOfLeaf.getId();
|
|
|
-
|
|
|
+ // 计算复杂度和危险度
|
|
|
+ TaskEntity taskEntity = simulationManualProjectTaskMapper.selectById(task2Id);
|
|
|
+ String sceneId = taskEntity.getSceneId();
|
|
|
+ if (!StringUtils.isEmpty(projectEntity.getComplexityEvaluationRuleId())) {
|
|
|
+ SceneEvaluationComputeParam sceneEvaluationComputeParam = new SceneEvaluationComputeParam();
|
|
|
+ sceneEvaluationComputeParam.setSceneId(sceneId);
|
|
|
+ sceneEvaluationComputeParam.setSceneXOSCPath(sceneEntityMap.get(sceneId).getScenarioOsc());
|
|
|
+ sceneEvaluationComputeParam.setSceneXODRPath(sceneEntityMap.get(sceneId).getScenarioOdr());
|
|
|
+ sceneEvaluationComputeParam.setSceneType(sceneEntityMap.get(sceneId).getType());
|
|
|
+ sceneEvaluationComputeParam.setTaskId(projectId);
|
|
|
+ sceneEvaluationComputeParam.setComputeType(DictConstants.COMPLEXITY);
|
|
|
+ sceneComplexityEvaluationComputeParamList.add(sceneEvaluationComputeParam);
|
|
|
+ }
|
|
|
String runState = taskOfLeaf.getRunState();
|
|
|
log.debug("任务 " + task2Id + " 的运行状态为:" + runState);
|
|
|
if (DictConstants.TASK_ANALYSIS.equals(runState)) {
|
|
|
simulationManualProjectTaskMapper.updateSuccessStateWithStopTime(task2Id, DictConstants.TASK_ANALYSING, TimeUtil.getNowForMysql());
|
|
|
// 计算每个任务的得分
|
|
|
final String runResultFilePath = taskOfLeaf.getRunResultFilePath();
|
|
|
+
|
|
|
+ // 计算复杂度和危险度
|
|
|
+ if (!StringUtils.isEmpty(projectEntity.getRiskEvaluationRuleId())) {
|
|
|
+ SceneEvaluationComputeParam sceneEvaluationComputeParam = new SceneEvaluationComputeParam();
|
|
|
+ sceneEvaluationComputeParam.setSceneId(sceneId);
|
|
|
+ sceneEvaluationComputeParam.setEvaluationPath(runResultFilePath);
|
|
|
+ sceneEvaluationComputeParam.setSceneType(sceneEntityMap.get(sceneId).getType());
|
|
|
+ sceneEvaluationComputeParam.setTaskId(projectId);
|
|
|
+ sceneEvaluationComputeParam.setComputeType(DictConstants.RISK);
|
|
|
+ sceneEvaluationComputeParam.setAlgorithmId(projectEntity.getAlgorithm());
|
|
|
+ sceneEvaluationComputeParam.setVehicleId(projectEntity.getVehicle());
|
|
|
+ sceneRiskEvaluationComputeParamList.add(sceneEvaluationComputeParam);
|
|
|
+ }
|
|
|
final ArrayList<String> csvResultFilePaths = CollectionUtil.createArrayList(
|
|
|
runResultFilePath + "/Ego.csv",
|
|
|
runResultFilePath + "/evaluation.csv",
|
|
@@ -251,32 +271,6 @@ public class TaskDomainService {
|
|
|
taskOfLeaf.setScored(false);
|
|
|
simulationManualProjectTaskMapper.updateFailStateWithStopTime(task2Id, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_5);
|
|
|
}
|
|
|
-
|
|
|
- // 计算复杂度和危险度
|
|
|
- TaskEntity taskEntity = simulationManualProjectTaskMapper.selectById(task2Id);
|
|
|
- String sceneId = taskEntity.getSceneId();
|
|
|
- if (!StringUtils.isEmpty(projectEntity.getComplexityEvaluationRuleId())) {
|
|
|
- SceneEvaluationComputeParam sceneEvaluationComputeParam = new SceneEvaluationComputeParam();
|
|
|
- sceneEvaluationComputeParam.setSceneId(sceneId);
|
|
|
- sceneEvaluationComputeParam.setSceneXOSCPath(sceneEntityMap.get(sceneId).getScenarioOsc());
|
|
|
- sceneEvaluationComputeParam.setSceneXODRPath(sceneEntityMap.get(sceneId).getScenarioOdr());
|
|
|
- sceneEvaluationComputeParam.setSceneType(sceneEntityMap.get(sceneId).getType());
|
|
|
- sceneEvaluationComputeParam.setTaskId(projectId);
|
|
|
- sceneEvaluationComputeParam.setComputeType(DictConstants.COMPLEXITY);
|
|
|
- sceneComplexityEvaluationComputeParamList.add(sceneEvaluationComputeParam);
|
|
|
- }
|
|
|
- // 计算复杂度和危险度
|
|
|
- if (!StringUtils.isEmpty(projectEntity.getRiskEvaluationRuleId())) {
|
|
|
- SceneEvaluationComputeParam sceneEvaluationComputeParam = new SceneEvaluationComputeParam();
|
|
|
- sceneEvaluationComputeParam.setSceneId(sceneId);
|
|
|
- sceneEvaluationComputeParam.setEvaluationPath(runResultFilePath);
|
|
|
- sceneEvaluationComputeParam.setSceneType(sceneEntityMap.get(sceneId).getType());
|
|
|
- sceneEvaluationComputeParam.setTaskId(projectId);
|
|
|
- sceneEvaluationComputeParam.setComputeType(DictConstants.RISK);
|
|
|
- sceneEvaluationComputeParam.setAlgorithmId(projectEntity.getAlgorithm());
|
|
|
- sceneEvaluationComputeParam.setVehicleId(projectEntity.getVehicle());
|
|
|
- sceneRiskEvaluationComputeParamList.add(sceneEvaluationComputeParam);
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -323,26 +317,24 @@ public class TaskDomainService {
|
|
|
|
|
|
leafIndexList.add(leafIndex);
|
|
|
}
|
|
|
- if (!StringUtils.isEmpty(projectEntity.getComplexityEvaluationRuleId())) {
|
|
|
+ if (CollectionUtil.isNotEmpty(sceneComplexityEvaluationComputeParamList)) {
|
|
|
try {
|
|
|
SceneImportParam sceneImportParam = new SceneImportParam();
|
|
|
sceneImportParam.setSceneEvaluationRuleId(projectEntity.getComplexityEvaluationRuleId());
|
|
|
- computeSceneReference(sceneImportParam, sceneComplexityEvaluationComputeParamList);
|
|
|
+ ThreadPool.sceneEvaluationComputePool.execute(new SceneEvaluationComputeRunnable(sceneImportParam, sceneComplexityEvaluationComputeParamList, linuxTempPath, bucketName));
|
|
|
} catch (Exception e) {
|
|
|
log.error("计算复杂度失败", e);
|
|
|
}
|
|
|
}
|
|
|
- if (!StringUtils.isEmpty(projectEntity.getRiskEvaluationRuleId())) {
|
|
|
+ if (CollectionUtil.isNotEmpty(sceneRiskEvaluationComputeParamList)) {
|
|
|
try {
|
|
|
SceneImportParam sceneImportParam = new SceneImportParam();
|
|
|
sceneImportParam.setSceneEvaluationRuleId(projectEntity.getRiskEvaluationRuleId());
|
|
|
- computeSceneReference(sceneImportParam, sceneRiskEvaluationComputeParamList);
|
|
|
+ ThreadPool.sceneEvaluationComputePool.execute(new SceneEvaluationComputeRunnable(sceneImportParam, sceneRiskEvaluationComputeParamList, linuxTempPath, bucketName));
|
|
|
} catch (Exception e) {
|
|
|
- log.error("计算复杂度失败", e);
|
|
|
+ log.error("计算危险度失败", e);
|
|
|
}
|
|
|
}
|
|
|
- // 删除临时文件
|
|
|
- FileUtil.rm(linuxTempPath + "scene/evaluation/" + projectId + "/"); // 删除临时文件
|
|
|
// 保存叶子指标得分
|
|
|
taskIndexRepository.batchInsertLeafIndex(leafIndexList);
|
|
|
// 保存一级指标分数
|
|
@@ -461,185 +453,6 @@ public class TaskDomainService {
|
|
|
}
|
|
|
|
|
|
|
|
|
- /**
|
|
|
- * 场景上传计算复杂度,计算复杂度需要用到 osc 和 odr 路径,计算危险度用 evaluationPath
|
|
|
- *
|
|
|
- * @param param
|
|
|
- */
|
|
|
- public boolean computeSceneReference(SceneImportParam param, List<SceneEvaluationComputeParam> sceneEvaluationComputeParams) {
|
|
|
- String ruleId = param.getSceneEvaluationRuleId();
|
|
|
- // 获取场景评价规则
|
|
|
- SceneEvaluationRulePO sceneEvaluationRulePO = sceneEvaluationRuleMapper.querySceneEvaluationPyById(ruleId);
|
|
|
- if (sceneEvaluationRulePO == null) {
|
|
|
- log.error(ruleId + " 的场景评价规则已删除");
|
|
|
- return false;
|
|
|
- }
|
|
|
- // 1 判断有没有用户目录,没有则复制
|
|
|
- String evaluationDirectoryOfUser = linuxTempPath + "scene/evaluation/" + sceneEvaluationComputeParams.get(0).getTaskId() + "/";
|
|
|
- String scriptsPath = evaluationDirectoryOfUser + "scripts";
|
|
|
- if (!new File(evaluationDirectoryOfUser).exists()) {
|
|
|
- // 1 将场景评价规则脚本保存到 script 目录
|
|
|
- FileUtil.createDirectory(scriptsPath);
|
|
|
- }
|
|
|
- // 下载场景评价脚本到脚本目录
|
|
|
- if (sceneEvaluationRulePO.getScriptPath() == null) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- String pyMainPath = scriptsPath + "/" + sceneEvaluationRulePO.getRuleId();
|
|
|
- if (!new File(pyMainPath).exists()) {
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, sceneEvaluationRulePO.getScriptPath(), pyMainPath);
|
|
|
- try {
|
|
|
- Path path = Paths.get(pyMainPath);
|
|
|
- Set<PosixFilePermission> permissions = new HashSet<>();
|
|
|
- permissions.add(PosixFilePermission.OWNER_READ);
|
|
|
- permissions.add(PosixFilePermission.OWNER_WRITE);
|
|
|
- permissions.add(PosixFilePermission.OWNER_EXECUTE);
|
|
|
- permissions.add(PosixFilePermission.GROUP_READ);
|
|
|
- permissions.add(PosixFilePermission.GROUP_WRITE);
|
|
|
- permissions.add(PosixFilePermission.GROUP_EXECUTE);
|
|
|
- permissions.add(PosixFilePermission.OTHERS_READ);
|
|
|
- permissions.add(PosixFilePermission.OTHERS_WRITE);
|
|
|
- permissions.add(PosixFilePermission.OTHERS_EXECUTE);
|
|
|
- Files.setPosixFilePermissions(path, permissions);
|
|
|
- } catch (IOException e) {
|
|
|
- log.error("更改执行文件权限失败: " + sceneEvaluationRulePO.getScriptPath(), e);
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
- String scenePathFather = evaluationDirectoryOfUser + "scene/";
|
|
|
- for (SceneEvaluationComputeParam sceneEvaluationComputeParam : sceneEvaluationComputeParams) {
|
|
|
- // 创建场景路径
|
|
|
- String scenePath = evaluationDirectoryOfUser + sceneEvaluationComputeParam.getSceneId();
|
|
|
- if (!new File(scenePath).exists()) {
|
|
|
- FileUtil.createDirectory(scenePath);
|
|
|
- } else {
|
|
|
- // 一个场景只计算一次
|
|
|
- return false;
|
|
|
- }
|
|
|
- try {
|
|
|
- if (StringUtils.equals(sceneEvaluationComputeParam.getComputeType(), DictConstants.COMPLEXITY)) {
|
|
|
- if (StringUtil.isEmpty(sceneEvaluationComputeParam.getSceneXODRPath())
|
|
|
- || StringUtil.isEmpty(sceneEvaluationComputeParam.getSceneXOSCPath())) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- // 计算复杂度,根据场景 id 获取场景信息,下载 osc odr
|
|
|
- String scenarioOsc = sceneEvaluationComputeParam.getSceneXOSCPath();
|
|
|
- String[] splitXosc = scenarioOsc.split("/");
|
|
|
- String xoscName = splitXosc[splitXosc.length - 1];
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, sceneEvaluationComputeParam.getSceneXOSCPath(), scenePath + "/" + xoscName);
|
|
|
- String scenarioOdr = sceneEvaluationComputeParam.getSceneXODRPath();
|
|
|
- String[] splitXodr = scenarioOdr.split("/");
|
|
|
- String xodrName = splitXodr[splitXodr.length - 1];
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, sceneEvaluationComputeParam.getSceneXODRPath(), scenePath + "/" + xodrName);
|
|
|
- } else if (StringUtils.equals(sceneEvaluationComputeParam.getComputeType(), DictConstants.RISK)) {
|
|
|
- if (StringUtil.isEmpty(sceneEvaluationComputeParam.getEvaluationPath())) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- // 计算危险度 从 minio path 下载 csv (ego 和 sensors)
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, sceneEvaluationComputeParam.getEvaluationPath() + "/Ego.csv", scenePath + "/Ego.csv");
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, sceneEvaluationComputeParam.getEvaluationPath() + "/evaluation.csv", scenePath + "/evaluation.csv");
|
|
|
- } else {
|
|
|
- return false;
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("文件下载失败", e);
|
|
|
- FileUtil.deleteFolder(scenePath); // 删除临时文件
|
|
|
- }
|
|
|
- }
|
|
|
- String sceneEvaluationCommand;
|
|
|
- if (StringUtils.equals(sceneEvaluationComputeParams.get(0).getComputeType(), DictConstants.COMPLEXITY)) {
|
|
|
- sceneEvaluationCommand = pyMainPath + " " + scenePathFather + " complexity";
|
|
|
- } else {
|
|
|
- sceneEvaluationCommand = pyMainPath + " " + scenePathFather + " criticality";
|
|
|
- }
|
|
|
- String sceneEvaluationResult;
|
|
|
- log.info("开始执行场景评价命令:" + sceneEvaluationCommand);
|
|
|
- Runtime r = Runtime.getRuntime();
|
|
|
- Process p = null;
|
|
|
- try {
|
|
|
- p = r.exec(sceneEvaluationCommand);
|
|
|
- } catch (IOException e) {
|
|
|
- log.error("执行场景评价脚本失败,脚本命令为: " + sceneEvaluationCommand, e);
|
|
|
- return false;
|
|
|
- }
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- try {
|
|
|
- BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
|
|
|
- String inline;
|
|
|
- while ((inline = br.readLine()) != null) {
|
|
|
- sb.append(inline).append("\n");
|
|
|
- }
|
|
|
- br.close();
|
|
|
- } catch (IOException e) {
|
|
|
- log.error("获取场景评价脚本返回内容失败", e);
|
|
|
- return false;
|
|
|
- }
|
|
|
- sceneEvaluationResult = sb.toString();
|
|
|
- log.info("场景评价结束,结果为:" + sceneEvaluationResult);
|
|
|
- for (SceneEvaluationComputeParam sceneEvaluationComputeParam : sceneEvaluationComputeParams) {
|
|
|
- // 读文件
|
|
|
- StringBuilder result = new StringBuilder();
|
|
|
- try {
|
|
|
- FileInputStream fileInputStream = new FileInputStream(scenePathFather + sceneEvaluationComputeParam.getSceneId() + "/scenario_evaluation.json");
|
|
|
- BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileInputStream));
|
|
|
-
|
|
|
- String line;
|
|
|
- while ((line = bufferedReader.readLine()) != null) {
|
|
|
- result.append(line).append("\n");
|
|
|
- }
|
|
|
- bufferedReader.close();
|
|
|
- } catch (IOException e) {
|
|
|
- log.error("读取场景评价结果失败", e);
|
|
|
- continue;
|
|
|
- }
|
|
|
- String resultStr = result.toString();
|
|
|
- String replace = StringUtil.replace(resultStr, "'", "\"");
|
|
|
- JsonNode rootNode;
|
|
|
- try {
|
|
|
- ObjectMapper mapper = new ObjectMapper();
|
|
|
- //JSON ----> JsonNode
|
|
|
- rootNode = mapper.readTree(replace);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("场景" + sceneEvaluationComputeParam.getSceneId() + " 的场景评价失败:", e);
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (StringUtils.equals(sceneEvaluationComputeParam.getComputeType(), DictConstants.COMPLEXITY)) {
|
|
|
- String complexity = rootNode.path("复杂度").asText();
|
|
|
- String complexityLevel = rootNode.path("复杂度等级").asText();
|
|
|
- SceneComplexityPO sceneComplexityPO = new SceneComplexityPO();
|
|
|
- sceneComplexityPO.setSceneId(sceneEvaluationComputeParam.getSceneId());
|
|
|
- sceneComplexityPO.setComplexityId(StringUtil.getRandomUUID());
|
|
|
- sceneComplexityPO.setSceneType(sceneEvaluationComputeParam.getSceneType());
|
|
|
- sceneComplexityPO.setRuleId(ruleId);
|
|
|
- sceneComplexityPO.setTaskId(sceneEvaluationComputeParam.getTaskId());
|
|
|
- sceneComplexityPO.setComplexity(complexity);
|
|
|
- sceneComplexityPO.setComplexityLevel(matchLevelEnumByLevel(complexityLevel));
|
|
|
- sceneComplexityPO.setIsDeleted(DictConstants.IS_NOT_DELETED);
|
|
|
- sceneComplexityPO.setCreateUserId(null);
|
|
|
- sceneComplexityPO.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
- sceneComplexityMapper.saveSceneComplexity(sceneComplexityPO);
|
|
|
- } else {
|
|
|
- String risk = rootNode.path("危险度").asText();
|
|
|
- String riskLevel = rootNode.path("危险度等级").asText();
|
|
|
- SceneRiskPO sceneRiskPO = new SceneRiskPO();
|
|
|
- sceneRiskPO.setSceneId(sceneEvaluationComputeParam.getSceneId());
|
|
|
- sceneRiskPO.setRuleId(StringUtil.getRandomUUID());
|
|
|
- sceneRiskPO.setSceneType(sceneEvaluationComputeParam.getSceneType());
|
|
|
- sceneRiskPO.setRuleId(ruleId);
|
|
|
- sceneRiskPO.setTaskId(sceneEvaluationComputeParam.getTaskId());
|
|
|
- sceneRiskPO.setRisk(risk);
|
|
|
- sceneRiskPO.setRiskLevel(matchLevelEnumByLevel(riskLevel));
|
|
|
- sceneRiskPO.setIsDeleted(DictConstants.IS_NOT_DELETED);
|
|
|
- sceneRiskPO.setCreateUserId(null);
|
|
|
- sceneRiskPO.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
- sceneRiskMapper.saveSceneRisk(sceneRiskPO);
|
|
|
- }
|
|
|
- }
|
|
|
- // 删除临时文件
|
|
|
- FileUtil.deleteFolder(linuxTempPath + "scene/evaluation/" + sceneEvaluationComputeParams.get(0).getTaskId()); // 删除临时文件
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
@SneakyThrows
|
|
|
public List<SceneEntity> getSceneList(String projectId, String packageId) {
|
|
|
|