|
@@ -2,7 +2,8 @@ package com.css.simulation.resource.scheduler.app.service;
|
|
|
|
|
|
import api.common.pojo.constants.DictConstants;
|
|
|
import api.common.util.*;
|
|
|
-import com.css.simulation.resource.scheduler.adapter.controller.model.ProjectMessageModel;
|
|
|
+import com.css.simulation.resource.scheduler.adapter.entity.ProjectStartMessageEntity;
|
|
|
+import com.css.simulation.resource.scheduler.adapter.entity.ProjectStopMessageEntity;
|
|
|
import com.css.simulation.resource.scheduler.app.entity.ProjectWaitQueueEntity;
|
|
|
import com.css.simulation.resource.scheduler.app.entity.VehicleEntity;
|
|
|
import com.css.simulation.resource.scheduler.domain.service.ProjectDomainService;
|
|
@@ -103,33 +104,34 @@ public class ProjectApplicationService {
|
|
|
// -------------------------------- Comment --------------------------------
|
|
|
|
|
|
@Async("pool1")
|
|
|
- public void runProject(ProjectMessageModel projectMessageModel) {
|
|
|
- String projectId = projectMessageModel.getProjectId();
|
|
|
- ProjectEntity project = projectDomainService.getProjectByProjectId(projectId);
|
|
|
- if (project == null) {
|
|
|
- throw new RuntimeException("不存在项目:" + projectId);
|
|
|
- }
|
|
|
+ public void runProject(ProjectStartMessageEntity projectStartMessageEntity) {
|
|
|
//1 创建任务文件并固定场景数据
|
|
|
- createTaskAndFixData(projectMessageModel);
|
|
|
+ createTaskAndFixData(projectStartMessageEntity);
|
|
|
//2 校验证书和并行度
|
|
|
- checkIfCanRun(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_RUN_TYPE_EXECUTE).waitingParallelism(projectMessageModel.getParallelism()).projectMessageModel(projectMessageModel).build());
|
|
|
+ checkIfCanRun(
|
|
|
+ ProjectWaitQueueEntity.builder()
|
|
|
+ .waitingType(DictConstants.PROJECT_RUN_TYPE_EXECUTE)
|
|
|
+ .waitingParallelism(projectStartMessageEntity.getParallelism())
|
|
|
+ .projectStartMessageEntity(projectStartMessageEntity)
|
|
|
+ .build()
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 接收到运行信息立即复制一份数据作为据运行数
|
|
|
*
|
|
|
- * @param projectMessageModel 项目启动消息
|
|
|
+ * @param projectStartMessageEntity 项目启动消息
|
|
|
*/
|
|
|
- public void createTaskAndFixData(ProjectMessageModel projectMessageModel) {
|
|
|
+ public void createTaskAndFixData(ProjectStartMessageEntity projectStartMessageEntity) {
|
|
|
//* -------------------------------- 0 读取消息,创建临时目录 --------------------------------
|
|
|
- String projectId = projectMessageModel.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
- String projectType = projectMessageModel.getType(); // 项目类型
|
|
|
+ String projectId = projectStartMessageEntity.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
+ String projectType = projectStartMessageEntity.getType(); // 项目类型
|
|
|
try {
|
|
|
- String modelType = projectMessageModel.getModelType(); // 模型类型,1 动力学模型 2 carsim模型
|
|
|
- String packageId = projectMessageModel.getScenePackageId(); // 场景测试包 id
|
|
|
- String vehicleConfigId = projectMessageModel.getVehicleConfigId(); // 模型配置 id
|
|
|
- String algorithmId = projectMessageModel.getAlgorithmId(); // 模型配置 id
|
|
|
- long videoTime = projectMessageModel.getMaxSimulationTime(); // 结果视频的时长
|
|
|
+ String modelType = projectStartMessageEntity.getModelType(); // 模型类型,1 动力学模型 2 carsim模型
|
|
|
+ String packageId = projectStartMessageEntity.getScenePackageId(); // 场景测试包 id
|
|
|
+ String vehicleConfigId = projectStartMessageEntity.getVehicleConfigId(); // 模型配置 id
|
|
|
+ String algorithmId = projectStartMessageEntity.getAlgorithmId(); // 模型配置 id
|
|
|
+ long videoTime = projectStartMessageEntity.getMaxSimulationTime(); // 结果视频的时长
|
|
|
String userId = projectDomainService.getUserIdByProjectIdAndProjectType(projectId, projectType);
|
|
|
String projectPath = linuxTempPath + "project/" + projectId + "/";
|
|
|
FileUtil.mkdir(projectPath);
|
|
@@ -142,8 +144,8 @@ public class ProjectApplicationService {
|
|
|
//根据场景测试包 packageId,拿到场景集合(包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
|
|
|
List<SceneEntity> sceneEntityList = getSceneList(projectId, packageId);
|
|
|
int taskTotal = sceneEntityList.size();
|
|
|
- projectMessageModel.setTaskTotal(taskTotal);
|
|
|
- projectMessageModel.setTaskCompleted(0);
|
|
|
+ projectStartMessageEntity.setTaskTotal(taskTotal);
|
|
|
+ projectStartMessageEntity.setTaskCompleted(0);
|
|
|
//去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
|
|
|
Set<SceneEntity> sceneEntitySet = new HashSet<>(sceneEntityList);
|
|
|
log.info("项目 " + projectId + " 场景包括:" + sceneEntitySet);
|
|
@@ -318,13 +320,13 @@ public class ProjectApplicationService {
|
|
|
public void checkIfCanRun(ProjectWaitQueueEntity projectWaitQueueEntity) {
|
|
|
final String waitingType = projectWaitQueueEntity.getWaitingType();
|
|
|
final Integer waitingParallelism = projectWaitQueueEntity.getWaitingParallelism();
|
|
|
- final ProjectMessageModel projectMessageModel = projectWaitQueueEntity.getProjectMessageModel();
|
|
|
- log.debug("判断用户是否拥有可分配资源:" + projectMessageModel);
|
|
|
+ final ProjectStartMessageEntity projectStartMessageEntity = projectWaitQueueEntity.getProjectStartMessageEntity();
|
|
|
+ log.debug("判断用户是否拥有可分配资源:" + projectStartMessageEntity);
|
|
|
//1 项目信息
|
|
|
- String modelType = projectMessageModel.getModelType();
|
|
|
- String projectId = projectMessageModel.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
- int parallelism = projectMessageModel.getParallelism(); // 项目并行度
|
|
|
- String projectType = projectMessageModel.getType(); // 项目类型
|
|
|
+ String modelType = projectStartMessageEntity.getModelType();
|
|
|
+ String projectId = projectStartMessageEntity.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
+ int parallelism = projectStartMessageEntity.getParallelism(); // 项目并行度
|
|
|
+ String projectType = projectStartMessageEntity.getType(); // 项目类型
|
|
|
String isChoiceGpu = projectDomainService.getIsChoiceGpuByProjectId(projectId);
|
|
|
//2 剩余并行度
|
|
|
int remainderParallelism = projectDomainService.getRemainderParallelism(isChoiceGpu);
|
|
@@ -341,12 +343,12 @@ public class ProjectApplicationService {
|
|
|
PrefixEntity redisPrefix = projectDomainService.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
final String projectRunningKey = redisPrefix.getProjectRunningKey();
|
|
|
if (remainderParallelism <= 0) {
|
|
|
- wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectMessageModel(projectMessageModel).build());
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectStartMessageEntity(projectStartMessageEntity).build());
|
|
|
} else if (remainderParallelism < parallelism) {
|
|
|
- wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(parallelism - remainderParallelism).projectMessageModel(projectMessageModel).build());
|
|
|
- run(clusterUserId, remainderParallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(parallelism - remainderParallelism).projectStartMessageEntity(projectStartMessageEntity).build());
|
|
|
+ run(clusterUserId, remainderParallelism, projectStartMessageEntity, projectRunningKey, isChoiceGpu);
|
|
|
} else {
|
|
|
- run(clusterUserId, parallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
|
+ run(clusterUserId, parallelism, projectStartMessageEntity, projectRunningKey, isChoiceGpu);
|
|
|
}
|
|
|
return;
|
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
@@ -382,23 +384,23 @@ public class ProjectApplicationService {
|
|
|
if (DictConstants.PROJECT_WAIT_TYPE_EXECUTE.equals(waitingType)) {
|
|
|
if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
|
|
|
if (remainderSimulationLicense <= 0 || remainderParallelism <= 0) {
|
|
|
- wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectMessageModel(projectMessageModel).build());
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectStartMessageEntity(projectStartMessageEntity).build());
|
|
|
} else if (remainderSimulationLicense < parallelism || remainderParallelism <= parallelism) {
|
|
|
- wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(parallelism - Math.min(remainderSimulationLicense, remainderParallelism)).projectMessageModel(projectMessageModel).build());
|
|
|
- run(clusterUserId, Math.min(remainderSimulationLicense, remainderParallelism), projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(parallelism - Math.min(remainderSimulationLicense, remainderParallelism)).projectStartMessageEntity(projectStartMessageEntity).build());
|
|
|
+ run(clusterUserId, Math.min(remainderSimulationLicense, remainderParallelism), projectStartMessageEntity, projectRunningKey, isChoiceGpu);
|
|
|
} else {
|
|
|
- run(clusterUserId, parallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
|
+ run(clusterUserId, parallelism, projectStartMessageEntity, projectRunningKey, isChoiceGpu);
|
|
|
}
|
|
|
} else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
|
|
|
usingDynamicLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_DYNAMIC);
|
|
|
remainderDynamicLicense = numDynamicLicense - usingDynamicLicenseNumber;
|
|
|
if (remainderSimulationLicense <= 0 || remainderDynamicLicense <= 0 || remainderParallelism <= 0) {
|
|
|
- wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectMessageModel(projectMessageModel).build());
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectStartMessageEntity(projectStartMessageEntity).build());
|
|
|
} else if (remainderSimulationLicense < parallelism || remainderDynamicLicense < parallelism || remainderParallelism < parallelism) {
|
|
|
- wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(parallelism - Math.min(Math.min(remainderSimulationLicense, remainderDynamicLicense), remainderParallelism)).projectMessageModel(projectMessageModel).build());
|
|
|
- run(clusterUserId, Math.min(remainderSimulationLicense, Math.min(remainderDynamicLicense, remainderParallelism)), projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(parallelism - Math.min(Math.min(remainderSimulationLicense, remainderDynamicLicense), remainderParallelism)).projectStartMessageEntity(projectStartMessageEntity).build());
|
|
|
+ run(clusterUserId, Math.min(remainderSimulationLicense, Math.min(remainderDynamicLicense, remainderParallelism)), projectStartMessageEntity, projectRunningKey, isChoiceGpu);
|
|
|
} else {
|
|
|
- run(clusterUserId, parallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
|
+ run(clusterUserId, parallelism, projectStartMessageEntity, projectRunningKey, isChoiceGpu);
|
|
|
}
|
|
|
} else {
|
|
|
throw new RuntimeException("未知模型类型:" + modelType);
|
|
@@ -411,7 +413,7 @@ public class ProjectApplicationService {
|
|
|
} else {
|
|
|
if (remainderSimulationLicense < waitingParallelism || remainderParallelism <= waitingParallelism) {
|
|
|
expandParallelism = Math.min(remainderSimulationLicense, remainderParallelism);
|
|
|
- wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(waitingParallelism - expandParallelism).projectMessageModel(projectMessageModel).build());
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(waitingParallelism - expandParallelism).projectStartMessageEntity(projectStartMessageEntity).build());
|
|
|
} else {
|
|
|
expandParallelism = waitingParallelism;
|
|
|
}
|
|
@@ -426,7 +428,7 @@ public class ProjectApplicationService {
|
|
|
} else {
|
|
|
if (remainderSimulationLicense < waitingParallelism || remainderDynamicLicense < waitingParallelism || remainderParallelism < waitingParallelism) {
|
|
|
expandParallelism = Math.min(Math.min(remainderSimulationLicense, remainderDynamicLicense), remainderParallelism);
|
|
|
- wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(waitingParallelism - expandParallelism).projectMessageModel(projectMessageModel).build());
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(waitingParallelism - expandParallelism).projectStartMessageEntity(projectStartMessageEntity).build());
|
|
|
} else {
|
|
|
expandParallelism = waitingParallelism;
|
|
|
}
|
|
@@ -458,7 +460,7 @@ public class ProjectApplicationService {
|
|
|
}
|
|
|
boolean contains = false;
|
|
|
for (ProjectWaitQueueEntity waitQueueEntity : waitingQueue) {
|
|
|
- if (waitQueueEntity.getProjectMessageModel().getProjectId().equals(projectWaitQueueEntity.getProjectMessageModel().getProjectId())) {
|
|
|
+ if (waitQueueEntity.getProjectStartMessageEntity().getProjectId().equals(projectWaitQueueEntity.getProjectStartMessageEntity().getProjectId())) {
|
|
|
contains = true;
|
|
|
waitQueueEntity.setWaitingParallelism(projectWaitQueueEntity.getWaitingParallelism());
|
|
|
}
|
|
@@ -477,32 +479,32 @@ public class ProjectApplicationService {
|
|
|
/**
|
|
|
* 运行项目
|
|
|
*
|
|
|
- * @param projectMessageModel 初始接收到的项目启动信息
|
|
|
- * @param projectRunningKey projectRunningKey
|
|
|
+ * @param projectStartMessageEntity 初始接收到的项目启动信息
|
|
|
+ * @param projectRunningKey projectRunningKey
|
|
|
*/
|
|
|
@SneakyThrows
|
|
|
- public void run(String clusterUserId, int finalParallelism, ProjectMessageModel projectMessageModel, String projectRunningKey, String isChoiceGpu) {
|
|
|
+ public void run(String clusterUserId, int finalParallelism, ProjectStartMessageEntity projectStartMessageEntity, String projectRunningKey, String isChoiceGpu) {
|
|
|
Map<String, Integer> remainderNodeMap = projectDomainService.getRemainderNodeMap(isChoiceGpu);
|
|
|
- String modelType = projectMessageModel.getModelType();
|
|
|
+ String modelType = projectStartMessageEntity.getModelType();
|
|
|
if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)) {
|
|
|
projectDomainService.useLicense(clusterUserId, modelType, finalParallelism);
|
|
|
}
|
|
|
- String projectId = projectMessageModel.getProjectId(); // 项目 id
|
|
|
- final String projectType = projectMessageModel.getType();
|
|
|
- String vehicleConfigId = projectMessageModel.getVehicleConfigId();
|
|
|
+ String projectId = projectStartMessageEntity.getProjectId(); // 项目 id
|
|
|
+ final String projectType = projectStartMessageEntity.getType();
|
|
|
+ String vehicleConfigId = projectStartMessageEntity.getVehicleConfigId();
|
|
|
String projectPath = linuxTempPath + "project/" + projectId + "/";
|
|
|
// -------------------------------- 1 获取任务 json 列表 --------------------------------
|
|
|
List<String> taskJsonList = FileUtil.listAbsolutePathByTypeAndLength(projectPath, "json", (StringUtil.getRandomUUID() + ".json").length());
|
|
|
int taskTotal = taskJsonList.size();
|
|
|
- projectMessageModel.setTaskTotal(taskTotal);
|
|
|
- projectMessageModel.setTaskCompleted(0);
|
|
|
+ projectStartMessageEntity.setTaskTotal(taskTotal);
|
|
|
+ projectStartMessageEntity.setTaskCompleted(0);
|
|
|
// 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
|
|
|
//1 获取剩余并行度和即将使用的各node的并行度
|
|
|
Map<String, Integer> nodeMap = projectDomainService.getNodeMapToUse(isChoiceGpu, finalParallelism);
|
|
|
//2 将指定 node 的并行度减少
|
|
|
nodeMap.keySet().forEach(nodeName -> projectDomainService.decrementParallelism(isChoiceGpu, nodeName, nodeMap.get(nodeName)));
|
|
|
log.info("项目 " + projectId + " 运行在:" + nodeMap);
|
|
|
- stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageModel));
|
|
|
+ stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectStartMessageEntity));
|
|
|
//* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
|
|
|
// String algorithmDockerImage = projectDomainService.getAlgorithmDockerImageByProjectTypeAndProjectId(projectType, projectId);
|
|
|
String algorithmDockerImage = projectDomainService.getAlgorithmDockerImageByProjectId(projectId);
|
|
@@ -774,11 +776,13 @@ public class ProjectApplicationService {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @param projectId 手动项目 id 或自动项目子id
|
|
|
- * @param projectType 项目类型
|
|
|
+ * @param projectStopMessageEntity 项目终止消息实体
|
|
|
*/
|
|
|
@SneakyThrows
|
|
|
- public void stopProject(String projectType, String projectId) {
|
|
|
+ public void stopProject(ProjectStopMessageEntity projectStopMessageEntity) {
|
|
|
+ final String projectId = projectStopMessageEntity.getProjectId();
|
|
|
+ final String projectType = projectStopMessageEntity.getType();
|
|
|
+
|
|
|
// 删除等待队列中的项目
|
|
|
projectDomainService.removeWaitQueue(DictConstants.PROJECT_WAIT_TYPE_ALL, projectId);
|
|
|
String isChoiceGpu = projectDomainService.getIsChoiceGpuByProjectId(projectId);
|