|
@@ -33,8 +33,11 @@ import io.minio.http.Method;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.kafka.clients.admin.Admin;
|
|
import org.apache.kafka.clients.admin.Admin;
|
|
|
|
+import org.apache.kafka.clients.producer.RecordMetadata;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
|
+import org.springframework.kafka.support.SendResult;
|
|
import org.springframework.scheduling.annotation.Async;
|
|
import org.springframework.scheduling.annotation.Async;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.util.CollectionUtils;
|
|
import org.springframework.util.CollectionUtils;
|
|
@@ -73,6 +76,8 @@ public class TaskApplicationService {
|
|
private CustomConfiguration customConfiguration;
|
|
private CustomConfiguration customConfiguration;
|
|
@Resource
|
|
@Resource
|
|
private TaskDomainService taskDomainService;
|
|
private TaskDomainService taskDomainService;
|
|
|
|
+ @Resource
|
|
|
|
+ private KafkaTemplate<String, String> kafkaTemplate;
|
|
|
|
|
|
@Resource
|
|
@Resource
|
|
private MultiSimulationProjectMapper multiSimulationProjectMapper;
|
|
private MultiSimulationProjectMapper multiSimulationProjectMapper;
|
|
@@ -234,6 +239,36 @@ public class TaskApplicationService {
|
|
log.info("多模式仿真收到不存在的任务的状态消息:" + taskId);
|
|
log.info("多模式仿真收到不存在的任务的状态消息:" + taskId);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
+ try {
|
|
|
|
+ if (DictConstants.TASK_ABORTED.equals(state)){
|
|
|
|
+ // 进行三次重试
|
|
|
|
+ int multiTaskRetryTimes = taskDomainService.getMultiTaskRetryTimes(taskId);
|
|
|
|
+ if (multiTaskRetryTimes < 3){
|
|
|
|
+ String taskBody = taskEntity.getTaskBody();
|
|
|
|
+ SendResult<String, String> stringStringSendResult = kafkaTemplate.send(taskEntity.getProjectId(), 0,
|
|
|
|
+ taskId, taskBody).get();
|
|
|
|
+ RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
|
|
|
|
+ String topic = recordMetadata.topic(); // 消息发送到的topic
|
|
|
|
+ int partition = recordMetadata.partition(); // 消息发送到的分区
|
|
|
|
+ long offset = recordMetadata.offset(); // 消息在分区内的offset
|
|
|
|
+ log.info("多模式仿真任务发送消息成功, 主题 topic 为项目ID:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset);
|
|
|
|
+ String yamlRedisKeyCache = "multi_yaml_cache_project_" + taskEntity.getProjectId()+"_scene_" + taskEntity.getSceneId();
|
|
|
|
+ String s = stringRedisTemplate.opsForValue().get(yamlRedisKeyCache);
|
|
|
|
+ if (!StringUtil.isEmpty(s)){
|
|
|
|
+ projectDomainService.createMultiPodBeginByYaml(s);
|
|
|
|
+ taskDomainService.setMultiTaskRetryTimes(taskId, multiTaskRetryTimes + 1);
|
|
|
|
+ return;
|
|
|
|
+ }else {
|
|
|
|
+ log.info("进行重试时,未找到yaml,taskId:{},projectId:{}", taskId, taskEntity.getProjectId());
|
|
|
|
+ // 不再进行重试,执行后续流程
|
|
|
|
+ }
|
|
|
|
+ }else {
|
|
|
|
+ log.info("多模式仿真任务重试三次之后失败,taskId:{},projectId:{},sceneId:{}", taskId,taskEntity.getProjectId(), taskEntity.getSceneId());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }catch (Exception e){
|
|
|
|
+ log.info("多模式仿真任务反馈执行abort判断异常taskId:{}", taskId, e);
|
|
|
|
+ }
|
|
String projectId = taskEntity.getProjectId(); // 项目 id
|
|
String projectId = taskEntity.getProjectId(); // 项目 id
|
|
String minioUploadPath = projectId + "/" + taskId + "/";
|
|
String minioUploadPath = projectId + "/" + taskId + "/";
|
|
|
|
|
|
@@ -241,6 +276,7 @@ public class TaskApplicationService {
|
|
customRedisClient.lock(lock1, 1L, 30 * 60L);
|
|
customRedisClient.lock(lock1, 1L, 30 * 60L);
|
|
String isChoiceGpu = DictConstants.USE_GPU;
|
|
String isChoiceGpu = DictConstants.USE_GPU;
|
|
try {
|
|
try {
|
|
|
|
+
|
|
MultiSimulationProjectVO projectVO = multiSimulationProjectMapper.selectMultiSimulationProjectById(projectId);
|
|
MultiSimulationProjectVO projectVO = multiSimulationProjectMapper.selectMultiSimulationProjectById(projectId);
|
|
// Integer status = projectVO.getProjectStatus();
|
|
// Integer status = projectVO.getProjectStatus();
|
|
MultiSimulationProjectParam multiSimulationProjectParam = new MultiSimulationProjectParam();
|
|
MultiSimulationProjectParam multiSimulationProjectParam = new MultiSimulationProjectParam();
|
|
@@ -388,12 +424,13 @@ public class TaskApplicationService {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (allCom){
|
|
if (allCom){
|
|
- multiSimulationProjectParam.setProjectStatus(DictConstants.TASK_ABORTED.equals(state) ? MultiSimulationStatusEnum.TERMINATED_STATUS.getProjectStatus() : MultiSimulationStatusEnum.COMPLETED_STATUS.getProjectStatus());
|
|
|
|
|
|
+// multiSimulationProjectParam.setProjectStatus(DictConstants.TASK_ABORTED.equals(state) ? MultiSimulationStatusEnum.TERMINATED_STATUS.getProjectStatus() : MultiSimulationStatusEnum.COMPLETED_STATUS.getProjectStatus());
|
|
|
|
+ multiSimulationProjectParam.setProjectStatus(MultiSimulationStatusEnum.COMPLETED_STATUS.getProjectStatus());
|
|
multiSimulationProjectMapper.updateMultiSimulationProjectStatus(multiSimulationProjectParam);
|
|
multiSimulationProjectMapper.updateMultiSimulationProjectStatus(multiSimulationProjectParam);
|
|
// 删除kafka topic
|
|
// 删除kafka topic
|
|
KafkaUtil.deleteTopic(kafkaAdminClient, projectId);
|
|
KafkaUtil.deleteTopic(kafkaAdminClient, projectId);
|
|
//6 删除项目 pod 启动文件
|
|
//6 删除项目 pod 启动文件
|
|
- FileUtil.deleteFileBySubstring(multiPodYamlDirectory, projectId);
|
|
|
|
|
|
+ FileUtil.deleteFileBySubstring(multiPodYamlDirectory + "multi/", projectId);
|
|
//7 删除项目临时文件
|
|
//7 删除项目临时文件
|
|
FileUtil.rm(linuxTempPath + "multiProject/" + projectId + "/");
|
|
FileUtil.rm(linuxTempPath + "multiProject/" + projectId + "/");
|
|
// 删除minio临时文件
|
|
// 删除minio临时文件
|