|
@@ -7,6 +7,7 @@ import api.common.pojo.param.project.MultiCreateYamlRet;
|
|
|
import api.common.pojo.param.project.MultiSimulationProjectKafkaParam;
|
|
|
import api.common.pojo.param.project.MultiSimulationProjectParam;
|
|
|
import api.common.pojo.param.project.MultiSimulationSceneKafkaParam;
|
|
|
+import api.common.pojo.po.group.SimulationMageGroupPO;
|
|
|
import api.common.pojo.po.project.MultiSimulationProjectTaskRecordPO;
|
|
|
import api.common.pojo.vo.map.SimulationMapVO;
|
|
|
import api.common.pojo.vo.project.MultiSimulationProjectVO;
|
|
@@ -39,6 +40,7 @@ import io.minio.MinioClient;
|
|
|
import lombok.SneakyThrows;
|
|
|
import lombok.Synchronized;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.kafka.clients.admin.Admin;
|
|
|
import org.apache.kafka.clients.producer.RecordMetadata;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
@@ -125,6 +127,8 @@ public class ProjectApplicationService {
|
|
|
private SimulationMapMapper mapMapper;
|
|
|
@Resource
|
|
|
private MultiSimulationProjectMapper multiSimulationProjectMapper;
|
|
|
+ @Resource
|
|
|
+ private SimulationMageGroupMapper mageGroupMapper;
|
|
|
|
|
|
@Resource
|
|
|
private MultiSimulationProjectTaskRecordMapper taskRecordMapper;
|
|
@@ -584,6 +588,7 @@ public class ProjectApplicationService {
|
|
|
parallel = multiTaskMessageEntityList.size();
|
|
|
}
|
|
|
Integer runState = multiProjectWaitQueue.getRunState();
|
|
|
+
|
|
|
// 使用完塞入
|
|
|
Map<String, Integer> multiNodeMapToUse = projectDomainService.getMultiNodeMapToUse(isChoiceGpu, parallel);
|
|
|
List<MultiCreateYamlRet> yamlList = new ArrayList<>();
|
|
@@ -615,7 +620,9 @@ public class ProjectApplicationService {
|
|
|
throw new RuntimeException("未选取到可用的节点");
|
|
|
}
|
|
|
MultiSimulationSceneKafkaParam multiSimulationSceneKafkaParam = multiProjectWaitQueue.getKafkaParamList().get(i);
|
|
|
- MultiCreateYamlRet multiTempYaml = projectDomainService.createMultiTempYaml(projectId, multiSimulationSceneKafkaParam, messageEntity, modelName, partition, offset, isChoiceGpu);
|
|
|
+ MultiCreateYamlRet multiTempYaml = projectDomainService.createMultiTempYaml(projectId, multiSimulationSceneKafkaParam, messageEntity
|
|
|
+ ,multiProjectWaitQueue.getConnectorPath(),multiProjectWaitQueue.getControllerPath()
|
|
|
+ , modelName, partition, offset, isChoiceGpu);
|
|
|
multiTempYaml.setTaskId(messageEntity.getInfo().getTask_id());
|
|
|
multiTempYaml.setNodeName(modelName);
|
|
|
yamlList.add(multiTempYaml);
|
|
@@ -674,6 +681,9 @@ public class ProjectApplicationService {
|
|
|
KafkaUtil.createTopic(kafkaAdminClient, projectId, finalParallelism, (short) 1); // 创建主题
|
|
|
TimeUnit.SECONDS.sleep(3);
|
|
|
// 需要即时启动的任务(并行度的大小)
|
|
|
+ SimulationMageGroupPO groupPO = mageGroupMapper.selectSimulationMageGroupById(projectStartMessageEntity.getSimulationMageGroupId());
|
|
|
+ String connectorPath = groupPO.getConnectorPath();
|
|
|
+ String controllerPath = groupPO.getControllerPath();
|
|
|
ArrayList<String> yamlToRunRedisKeyList = new ArrayList<>();
|
|
|
for (String taskJsonPath : taskJsonList) {
|
|
|
String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
|
|
@@ -718,7 +728,7 @@ public class ProjectApplicationService {
|
|
|
}
|
|
|
// 只有准备启动(即 currentCount == 0)的时候才指定 cpu 编号
|
|
|
log.info("创建任务 {} 的 yaml:是否使用 gpu (0是1否){},当前节点已创建 yaml 个数为:{},当前节点名称为:{},当前 cpu 编号为:{},镜像名:{}", taskId, isChoiceGpu, currentCount, currentNodeName, cpuOrder, algorithmDockerImage);
|
|
|
- String yamlRedisKey = projectDomainService.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder);
|
|
|
+ String yamlRedisKey = projectDomainService.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder, connectorPath, controllerPath);
|
|
|
if (currentCount == 0) {
|
|
|
yamlToRunRedisKeyList.add(yamlRedisKey);
|
|
|
}
|
|
@@ -1038,6 +1048,7 @@ public class ProjectApplicationService {
|
|
|
}
|
|
|
|
|
|
@Async("pool1")
|
|
|
+ @SneakyThrows
|
|
|
public void runMultiProject(MultiSimulationProjectKafkaParam projectStartMessageEntity) {
|
|
|
MultiProjectWaitQueueEntity multiTaskAndFixData = createMultiTaskAndFixData(projectStartMessageEntity);
|
|
|
checkIfCanRunMulti(multiTaskAndFixData);
|
|
@@ -1123,6 +1134,16 @@ public class ProjectApplicationService {
|
|
|
KafkaUtil.deleteTopic(kafkaAdminClient, projectId);
|
|
|
|
|
|
List<MultiSimulationSceneKafkaParam> kafkaParamList = projectStartMessageEntity.getKafkaParamList();
|
|
|
+
|
|
|
+ MultiSimulationProjectVO projectVO = multiSimulationProjectMapper.selectMultiSimulationProjectById(projectId);
|
|
|
+ if (Objects.isNull(projectVO)){
|
|
|
+ throw new RuntimeException("未找到有效的仿真任务");
|
|
|
+ }
|
|
|
+ String simulationMageGroupId = projectVO.getSimulationMageGroupId();
|
|
|
+ SimulationMageGroupPO groupPO = mageGroupMapper.selectSimulationMageGroupById(simulationMageGroupId);
|
|
|
+ if (Objects.isNull(groupPO) || StringUtils.isBlank(groupPO.getControllerPath()) || StringUtils.isBlank(groupPO.getConnectorPath())){
|
|
|
+ throw new RuntimeException("仿真镜像组无效");
|
|
|
+ }
|
|
|
List<MultiTaskMessageEntity> entityList = new ArrayList<>();
|
|
|
for (MultiSimulationSceneKafkaParam kafkaParam: kafkaParamList) {
|
|
|
String taskId = StringUtil.getRandomUUID();
|
|
@@ -1226,6 +1247,9 @@ public class ProjectApplicationService {
|
|
|
MultiProjectWaitQueueEntity build = MultiProjectWaitQueueEntity.builder()
|
|
|
.multiTaskMessageEntityList(entityList)
|
|
|
.projectId(projectId)
|
|
|
+ .connectorPath(groupPO.getConnectorPath())
|
|
|
+ .controllerPath(groupPO.getControllerPath())
|
|
|
+ .softwarePath(groupPO.getSoftwarePath())
|
|
|
.waitingParallelism(entityList.size())
|
|
|
.kafkaParamList(kafkaParamList)
|
|
|
.runState(-1)
|
|
@@ -1344,6 +1368,9 @@ public class ProjectApplicationService {
|
|
|
waitMulti(MultiProjectWaitQueueEntity.builder().waitingParallelism(parallelism - remainderParallelism)
|
|
|
.runState(runSt)
|
|
|
.projectId(projectWaitQueueEntity.getProjectId())
|
|
|
+ .softwarePath(projectWaitQueueEntity.getSoftwarePath())
|
|
|
+ .connectorPath(projectWaitQueueEntity.getConnectorPath())
|
|
|
+ .controllerPath(projectWaitQueueEntity.getControllerPath())
|
|
|
.multiTaskMessageEntityList(projectWaitQueueEntity.getMultiTaskMessageEntityList())
|
|
|
.kafkaParamList(projectWaitQueueEntity.getKafkaParamList())
|
|
|
.build());
|
|
@@ -1361,6 +1388,9 @@ public class ProjectApplicationService {
|
|
|
waitMulti(MultiProjectWaitQueueEntity.builder().waitingParallelism(0)
|
|
|
.runState(projectWaitQueueEntity.getMultiTaskMessageEntityList().size() -1)
|
|
|
.projectId(projectWaitQueueEntity.getProjectId())
|
|
|
+ .softwarePath(projectWaitQueueEntity.getSoftwarePath())
|
|
|
+ .connectorPath(projectWaitQueueEntity.getConnectorPath())
|
|
|
+ .controllerPath(projectWaitQueueEntity.getControllerPath())
|
|
|
.kafkaParamList(projectWaitQueueEntity.getKafkaParamList())
|
|
|
.multiTaskMessageEntityList(projectWaitQueueEntity.getMultiTaskMessageEntityList()).build()
|
|
|
);
|