|
@@ -3,6 +3,7 @@ package com.css.simulation.resource.scheduler.app.service;
|
|
|
import api.common.pojo.constants.DictConstants;
|
|
|
import api.common.pojo.enums.MultiSimulationResultTypeEnum;
|
|
|
import api.common.pojo.enums.MultiSimulationStatusEnum;
|
|
|
+import api.common.pojo.enums.MultiSimulationTaskStatusEnum;
|
|
|
import api.common.pojo.param.project.MultiCreateYamlRet;
|
|
|
import api.common.pojo.param.project.MultiSimulationProjectParam;
|
|
|
import api.common.pojo.po.project.MultiSimulationProjectResultPO;
|
|
@@ -15,20 +16,20 @@ import com.css.simulation.resource.scheduler.domain.service.TaskDomainService;
|
|
|
import com.css.simulation.resource.scheduler.infra.configuration.custom.CustomConfiguration;
|
|
|
import com.css.simulation.resource.scheduler.infra.configuration.kubernetes.KubernetesConfiguration;
|
|
|
import com.css.simulation.resource.scheduler.infra.configuration.kubernetes.KubernetesUtil;
|
|
|
-import com.css.simulation.resource.scheduler.infra.db.mysql.mapper.MultiSimulationProjectMapper;
|
|
|
-import com.css.simulation.resource.scheduler.infra.db.mysql.mapper.MultiSimulationProjectResultMapper;
|
|
|
-import com.css.simulation.resource.scheduler.infra.db.mysql.mapper.MultiSimulationProjectTaskRecordMapper;
|
|
|
+import com.css.simulation.resource.scheduler.infra.configuration.minio.MinioConfiguration;
|
|
|
+import com.css.simulation.resource.scheduler.infra.db.mysql.mapper.*;
|
|
|
import com.css.simulation.resource.scheduler.infra.entity.MultiTaskMessageEntity;
|
|
|
import com.css.simulation.resource.scheduler.infra.fs.minio.MinioUtil;
|
|
|
import com.css.simulation.resource.scheduler.infra.db.redis.CustomRedisClient;
|
|
|
import com.css.simulation.resource.scheduler.infra.entity.PrefixEntity;
|
|
|
import com.css.simulation.resource.scheduler.infra.entity.ProjectEntity;
|
|
|
import com.css.simulation.resource.scheduler.infra.entity.TaskEntity;
|
|
|
-import com.css.simulation.resource.scheduler.infra.db.mysql.mapper.SimulationManualProjectTaskMapper;
|
|
|
import com.css.simulation.resource.scheduler.infra.db.redis.RedisUtil;
|
|
|
import com.css.simulation.resource.scheduler.infra.mq.kafka.KafkaUtil;
|
|
|
import io.kubernetes.client.openapi.ApiClient;
|
|
|
+import io.minio.GetPresignedObjectUrlArgs;
|
|
|
import io.minio.MinioClient;
|
|
|
+import io.minio.http.Method;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.kafka.clients.admin.Admin;
|
|
@@ -41,6 +42,7 @@ import org.springframework.util.CollectionUtils;
|
|
|
import javax.annotation.Resource;
|
|
|
import java.io.File;
|
|
|
import java.util.List;
|
|
|
+import java.util.Objects;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
@Service
|
|
@@ -76,9 +78,15 @@ public class TaskApplicationService {
|
|
|
private MultiSimulationProjectMapper multiSimulationProjectMapper;
|
|
|
@Resource
|
|
|
private MultiSimulationProjectTaskRecordMapper taskRecordMapper;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private MultiSimulationSceneMapper multiSimulationSceneMapper;
|
|
|
@Resource
|
|
|
private MultiSimulationProjectResultMapper multiSimulationResultMapper;
|
|
|
|
|
|
+ @Resource
|
|
|
+ private MinioConfiguration minioConfiguration;
|
|
|
+
|
|
|
@Resource(name = "myKafkaAdmin")
|
|
|
private Admin kafkaAdminClient;
|
|
|
@Value("${scheduler.linux-path.multi-pod-yaml-directory}")
|
|
@@ -239,7 +247,7 @@ public class TaskApplicationService {
|
|
|
MultiSimulationProjectParam multiSimulationProjectParam = new MultiSimulationProjectParam();
|
|
|
multiSimulationProjectParam.setProjectId(projectId);
|
|
|
String projectUserId = projectVO.getProjectUserId();
|
|
|
- String clusterUserId = projectDomainService.getClusterUserIdByProjectUserId(projectUserId);
|
|
|
+
|
|
|
// 查询相关信息
|
|
|
String nodeNameKey = "multi-taskId:" + taskEntity.getId();
|
|
|
String value = stringRedisTemplate.opsForValue().get(nodeNameKey);
|
|
@@ -261,15 +269,17 @@ public class TaskApplicationService {
|
|
|
projectDomainService.incrementOneParallelism(isChoiceGpu, nodeName);
|
|
|
// 释放证书
|
|
|
projectDomainService.releaseLicense(projectDomainService.getClusterUserIdByProjectUserId(projectUserId), DictConstants.MODEL_TYPE_VTD, 1);
|
|
|
- if (DictConstants.TASK_COMPLETED.equals(state)) {
|
|
|
- // 更新状态
|
|
|
- taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationStatusEnum.COMPLETED_STATUS.getProjectStatus(), taskEntity.getId(), 0);
|
|
|
+ // 执行完成待分析状态
|
|
|
+ if (DictConstants.TASK_ANALYSIS.equals(state)){
|
|
|
+ taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationTaskStatusEnum.NEED_ANALYSIS_STATUS.getProjectStatus(), taskEntity.getId(), 0);
|
|
|
// 进行仿真评价
|
|
|
String taskBody = taskEntity.getTaskBody();
|
|
|
MultiTaskMessageEntity messageEntity = JSONObject.parseObject(taskBody, MultiTaskMessageEntity.class);
|
|
|
String taskPath = messageEntity.getInfo().getTask_path();
|
|
|
List<String> list = MinioUtil.listAllFileName(minioClient, bucketName, taskPath);
|
|
|
String csvName = null;
|
|
|
+ String allMp4Url = null;
|
|
|
+ String simulationMp4Url = null;
|
|
|
for (String str : list) {
|
|
|
if (StringUtils.contains(str, "csv")) {
|
|
|
csvName = str;
|
|
@@ -279,7 +289,8 @@ public class TaskApplicationService {
|
|
|
if (StringUtils.isNotBlank(csvName)) {
|
|
|
String linuxPath = linuxTempPath + "multiProject/" + projectId + "/" + taskId + "/";
|
|
|
String linuxFile = linuxPath + csvName;
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, taskPath + csvName, linuxFile);
|
|
|
+ String minioPathCsv = taskPath + csvName;
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, minioPathCsv, linuxFile);
|
|
|
String pythonCom = "python3 " + multiVtdPodTemplateAnaPy + " --csvFile=\"" + linuxPath + "\"" + " --outputResultFile=\"" + linuxPath + "\"";
|
|
|
LinuxUtil.execute(pythonCom);
|
|
|
Thread.sleep(10000);
|
|
@@ -298,20 +309,44 @@ public class TaskApplicationService {
|
|
|
multiSimulationProjectResultPO.setId(StringUtil.getRandomUUID()).setSceneId(taskEntity.getSceneId())
|
|
|
.setAbnormalTimeDescription(jsonObject.getString("lane_departure")).setAbnormalType(MultiSimulationResultTypeEnum.OUT_OF_PAVEMENT.getResultType());
|
|
|
multiSimulationResultMapper.insertProjectResult(multiSimulationProjectResultPO);
|
|
|
-
|
|
|
multiSimulationProjectResultPO = new MultiSimulationProjectResultPO();
|
|
|
multiSimulationProjectResultPO.setId(StringUtil.getRandomUUID()).setSceneId(taskEntity.getSceneId())
|
|
|
.setAbnormalTimeDescription(jsonObject.getString("phrases")).setAbnormalType(MultiSimulationResultTypeEnum.LAST_DESCRIPTION.getResultType());
|
|
|
multiSimulationResultMapper.insertProjectResult(multiSimulationProjectResultPO);
|
|
|
- // TODO 生成仿真视频url
|
|
|
+ String urlMp4Pub = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
|
|
|
+ .method(Method.GET)
|
|
|
+ .bucket(bucketName)
|
|
|
+ .object(allMp4Url)
|
|
|
+ .build())
|
|
|
+ .replace(minioConfiguration.getEndpointPrivate(), minioConfiguration.getEndpointPublic());
|
|
|
+ String simUrlMp4Pub = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
|
|
|
+ .method(Method.GET)
|
|
|
+ .bucket(bucketName)
|
|
|
+ .object(simulationMp4Url)
|
|
|
+ .build())
|
|
|
+ .replace(minioConfiguration.getEndpointPrivate(), minioConfiguration.getEndpointPublic());
|
|
|
+ String csvUrlPub = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
|
|
|
+ .method(Method.GET)
|
|
|
+ .bucket(bucketName)
|
|
|
+ .object(minioPathCsv)
|
|
|
+ .build())
|
|
|
+ .replace(minioConfiguration.getEndpointPrivate(), minioConfiguration.getEndpointPublic());
|
|
|
+ multiSimulationSceneMapper.updateSceneResultById(csvUrlPub, urlMp4Pub, simUrlMp4Pub, taskEntity.getSceneId());
|
|
|
|
|
|
} else {
|
|
|
log.info("taskId:{}未找到csv文件", taskId);
|
|
|
}
|
|
|
+ taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationTaskStatusEnum.COMPLETED_STATUS.getProjectStatus(), taskEntity.getId(), 0);
|
|
|
+ } else if (DictConstants.TASK_ABORTED.equals(state)) {
|
|
|
+ taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationTaskStatusEnum.AUTO_TERMINATED_STATUS.getProjectStatus(), taskEntity.getId(), 0);
|
|
|
+ } else if (DictConstants.TASK_TERMINATED.equals(state)) {
|
|
|
+ taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationTaskStatusEnum.TERMINATED_STATUS.getProjectStatus(), taskEntity.getId(), 0);
|
|
|
} else {
|
|
|
// 终止
|
|
|
- taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationStatusEnum.TERMINATED_STATUS.getProjectStatus(), taskEntity.getId(), 0);
|
|
|
- log.info("taskId:{},项目已经停止", taskId);
|
|
|
+ log.info("未知的反馈状态类型projectId:{},taskId:{},state:{}", projectId, taskId, state);
|
|
|
+ return;
|
|
|
+// taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationStatusEnum.TERMINATED_STATUS.getProjectStatus(), taskEntity.getId(), 0);
|
|
|
+// log.info("taskId:{},项目已经停止", taskId);
|
|
|
}
|
|
|
}
|
|
|
List<MultiSimulationProjectTaskRecordPO> recordPOList = multiTaskRecordMapper.selectMultiSimulationProjectTaskRecordList(projectId);
|
|
@@ -335,7 +370,7 @@ public class TaskApplicationService {
|
|
|
//7 删除项目临时文件
|
|
|
FileUtil.rm(linuxTempPath + "multiProject/" + projectId + "/");
|
|
|
// 删除minio临时文件
|
|
|
-// MinioUtil.rmR(minioClient, bucketName, projectResultPathOfMinio + projectId + "/");
|
|
|
+ MinioUtil.rmR(minioClient, bucketName, projectResultPathOfMinio + projectId + "/");
|
|
|
// 删除算法key
|
|
|
// 删除yaml路径redis
|
|
|
// 删除记录podNamekey
|
|
@@ -355,9 +390,41 @@ public class TaskApplicationService {
|
|
|
return taskDomainService.taskConfirm(taskId);
|
|
|
}
|
|
|
|
|
|
+ public Boolean multiConfirm(String taskId) {
|
|
|
+ MultiSimulationProjectTaskRecordPO multiSimulationProjectTaskRecordPO = multiTaskRecordMapper.selectMultiSimulationProjectTaskRecordById(taskId);
|
|
|
+ if (Objects.isNull(multiSimulationProjectTaskRecordPO)){
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ Integer status = multiSimulationProjectTaskRecordPO.getStatus();
|
|
|
+ if (status == MultiSimulationTaskStatusEnum.TERMINATED_STATUS.getProjectStatus() ||
|
|
|
+ status == MultiSimulationTaskStatusEnum.COMPLETED_STATUS.getProjectStatus() ||
|
|
|
+ status == MultiSimulationTaskStatusEnum.AUTO_TERMINATED_STATUS.getProjectStatus() ||
|
|
|
+ status == MultiSimulationTaskStatusEnum.NEED_ANALYSIS_STATUS.getProjectStatus() ||
|
|
|
+ status == MultiSimulationTaskStatusEnum.TERMINATED_STATUS.getProjectStatus()){
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (status == MultiSimulationTaskStatusEnum.RUN_STATUS.getProjectStatus() || status == MultiSimulationTaskStatusEnum.INIT_STATUS.getProjectStatus()){
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
public void tick(String taskId) {
|
|
|
taskDomainService.taskTick(taskId);
|
|
|
}
|
|
|
+
|
|
|
+ public void multiTick(String taskId) {
|
|
|
+ log.info("多模式仿真收到任务 " + taskId + " 的心跳。");
|
|
|
+ MultiSimulationProjectTaskRecordPO taskRecordPO = multiTaskRecordMapper.selectMultiSimulationProjectTaskRecordById(taskId);
|
|
|
+ if (taskRecordPO == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String projectId = taskRecordPO.getProjectId();
|
|
|
+ // 刷新 redis 心跳时间
|
|
|
+ String multiTaskTickPodRedisKey = taskDomainService.getMultiTaskTickPodRedisKey(taskId, projectId);
|
|
|
+ stringRedisTemplate.opsForValue().set(multiTaskTickPodRedisKey, TimeUtil.getNowString());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|