|
@@ -1,18 +1,17 @@
|
|
|
-package com.css.simulation.resource.scheduler.web.consumer;
|
|
|
+package com.css.simulation.resource.scheduler.consumer;
|
|
|
|
|
|
|
|
|
import api.common.pojo.constants.DictConstants;
|
|
|
import api.common.pojo.dto.ProjectMessageDTO;
|
|
|
import api.common.util.*;
|
|
|
-import com.css.simulation.resource.scheduler.common.util.ApacheKafkaUtil;
|
|
|
-import com.css.simulation.resource.scheduler.common.util.MinioUtil;
|
|
|
-import com.css.simulation.resource.scheduler.common.util.ProjectUtil;
|
|
|
-import com.css.simulation.resource.scheduler.dao.entity.*;
|
|
|
-import com.css.simulation.resource.scheduler.dao.manager.ProjectManager;
|
|
|
-import com.css.simulation.resource.scheduler.dao.manager.TaskManager;
|
|
|
-import com.css.simulation.resource.scheduler.dao.mapper.*;
|
|
|
+import com.css.simulation.resource.scheduler.entity.*;
|
|
|
+import com.css.simulation.resource.scheduler.manager.ProjectManager;
|
|
|
+import com.css.simulation.resource.scheduler.manager.TaskManager;
|
|
|
+import com.css.simulation.resource.scheduler.mapper.*;
|
|
|
import com.css.simulation.resource.scheduler.service.ProjectService;
|
|
|
-import com.css.simulation.resource.scheduler.service.domain.*;
|
|
|
+import com.css.simulation.resource.scheduler.util.ApacheKafkaUtil;
|
|
|
+import com.css.simulation.resource.scheduler.util.MinioUtil;
|
|
|
+import com.css.simulation.resource.scheduler.util.ProjectUtil;
|
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import io.minio.MinioClient;
|
|
@@ -124,13 +123,13 @@ public class ProjectConsumer {
|
|
|
// -------------------------------- 1 查询场景 --------------------------------
|
|
|
log.info("项目 " + projectId + " 开始查询场景。");
|
|
|
//根据场景测试包 packageId,拿到场景集合(包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
|
|
|
- List<ScenePO> scenePOList = projectService.getSceneList(projectId, packageId);
|
|
|
- int taskTotal = scenePOList.size();
|
|
|
+ List<SceneEntity> sceneEntityList = projectService.getSceneList(projectId, packageId);
|
|
|
+ int taskTotal = sceneEntityList.size();
|
|
|
projectMessageDTO.setTaskTotal(taskTotal);
|
|
|
projectMessageDTO.setTaskCompleted(0);
|
|
|
//去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
|
|
|
- Set<ScenePO> scenePOSet = new HashSet<>(scenePOList);
|
|
|
- log.info("项目 " + projectId + " 场景包括:" + scenePOSet);
|
|
|
+ Set<SceneEntity> sceneEntitySet = new HashSet<>(sceneEntityList);
|
|
|
+ log.info("项目 " + projectId + " 场景包括:" + sceneEntitySet);
|
|
|
// -------------------------------- 2 算法导入 --------------------------------
|
|
|
log.info("项目 " + projectId + " 开始算法导入。");
|
|
|
String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
|
|
@@ -139,14 +138,14 @@ public class ProjectConsumer {
|
|
|
if ("1".equals(modelType)) {
|
|
|
log.info("项目 " + projectId + " 开始查询模型。");
|
|
|
//2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
|
- VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
- List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
- List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
+ VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
+ List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
+ List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
// -------------------------------- 4 保存任务消息 --------------------------------
|
|
|
log.info("项目 " + projectId + " 开始保存任务消息。");
|
|
|
- List<TaskPO> taskList = new ArrayList<>();
|
|
|
- for (ScenePO scenePO : scenePOSet) {
|
|
|
- String sceneId = scenePO.getId();
|
|
|
+ List<TaskEntity> taskList = new ArrayList<>();
|
|
|
+ for (SceneEntity sceneEntity : sceneEntitySet) {
|
|
|
+ String sceneId = sceneEntity.getId();
|
|
|
//3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
|
|
|
List<String> lastTargetIdList = null;
|
|
|
if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
@@ -160,25 +159,25 @@ public class ProjectConsumer {
|
|
|
for (String lastTargetId : lastTargetIdList) {
|
|
|
String taskId = StringUtil.getRandomUUID();
|
|
|
// 保存任务信息
|
|
|
- TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
|
|
|
+ TaskEntity taskEntity = TaskEntity.builder() // run_start_time 和 run_end_time 不填
|
|
|
.id(taskId)
|
|
|
.pId(projectId)
|
|
|
.sceneId(sceneId)
|
|
|
.lastTargetId(lastTargetId)
|
|
|
- .sceneName(scenePO.getName())
|
|
|
- .sceneType(scenePO.getType())
|
|
|
+ .sceneName(sceneEntity.getName())
|
|
|
+ .sceneType(sceneEntity.getType())
|
|
|
.runState(DictConstants.TASK_PENDING)
|
|
|
.runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId)
|
|
|
.build();
|
|
|
- taskPO.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
- taskPO.setCreateUserId(userId);
|
|
|
- taskPO.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
- taskPO.setModifyUserId(userId);
|
|
|
- taskPO.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
- taskPO.setIsDeleted("0");
|
|
|
- taskList.add(taskPO);
|
|
|
+ taskEntity.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
+ taskEntity.setCreateUserId(userId);
|
|
|
+ taskEntity.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
+ taskEntity.setModifyUserId(userId);
|
|
|
+ taskEntity.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
+ taskEntity.setIsDeleted("0");
|
|
|
+ taskList.add(taskEntity);
|
|
|
// 将 xosc、xodr、osgb 全部上传到仿真结果路径
|
|
|
- String scenarioOsc = scenePO.getScenarioOsc();
|
|
|
+ String scenarioOsc = sceneEntity.getScenarioOsc();
|
|
|
String[] splitXosc = scenarioOsc.split("/");
|
|
|
String xoscName = splitXosc[splitXosc.length - 1];
|
|
|
String[] xoscNameSplit = xoscName.split("\\.");
|
|
@@ -188,7 +187,7 @@ public class ProjectConsumer {
|
|
|
MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
|
|
|
MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
|
|
|
|
|
|
- String scenarioOdr = scenePO.getScenarioOdr();
|
|
|
+ String scenarioOdr = sceneEntity.getScenarioOdr();
|
|
|
String[] splitXodr = scenarioOdr.split("/");
|
|
|
String xodrName = splitXodr[splitXodr.length - 1];
|
|
|
String[] xodrNameSplit = xodrName.split("\\.");
|
|
@@ -198,7 +197,7 @@ public class ProjectConsumer {
|
|
|
MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
|
|
|
MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
|
|
|
|
|
|
- String scenarioOsgb = scenePO.getScenarioOsgb();
|
|
|
+ String scenarioOsgb = sceneEntity.getScenarioOsgb();
|
|
|
String[] splitOsgb = scenarioOsgb.split("/");
|
|
|
String osgbName = splitOsgb[splitOsgb.length - 1];
|
|
|
String[] osgbNameSplit = osgbName.split("\\.");
|
|
@@ -210,49 +209,49 @@ public class ProjectConsumer {
|
|
|
log.info("cacheManualProject() 已经将 xosc、xodr、osgb 上传到 minio 的结果文件目录:" + projectResultPathOfMinio);
|
|
|
|
|
|
// 组装 task 消息
|
|
|
- TaskTO taskTO = TaskTO.builder()
|
|
|
- .info(InfoTO.builder()
|
|
|
- .project_id(taskPO.getPId())
|
|
|
- .task_id(taskPO.getId())
|
|
|
- .task_path(taskPO.getRunResultFilePath())
|
|
|
+ TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder()
|
|
|
+ .info(InfoEntity.builder()
|
|
|
+ .project_id(taskEntity.getPId())
|
|
|
+ .task_id(taskEntity.getId())
|
|
|
+ .task_path(taskEntity.getRunResultFilePath())
|
|
|
.default_time(videoTime)
|
|
|
.build())
|
|
|
- .scenario(ScenarioTO.builder()
|
|
|
+ .scenario(ScenarioEntity.builder()
|
|
|
.scenario_osc(xoscPathOfMinio)
|
|
|
.scenario_odr(xodrPathOfMinio)
|
|
|
.scenario_osgb(osgbPathOfMinio)
|
|
|
.build())
|
|
|
.vehicle(VehicleTO.builder()
|
|
|
- .model(ModelTO.builder()
|
|
|
- .model_label(vehiclePO.getModelLabel())
|
|
|
+ .model(ModelEntity.builder()
|
|
|
+ .model_label(vehicleEntity.getModelLabel())
|
|
|
.build())
|
|
|
.dynamics(DynamicsTO.builder()
|
|
|
- .dynamics_maxspeed(vehiclePO.getMaxSpeed())
|
|
|
- .dynamics_enginepower(vehiclePO.getEnginePower())
|
|
|
- .dynamics_maxdecel(vehiclePO.getMaxDeceleration())
|
|
|
- .dynamics_maxsteering(vehiclePO.getMaxSteeringAngle())
|
|
|
- .dynamics_mass(vehiclePO.getMass())
|
|
|
- .dynamics_frontsurfaceeffective(vehiclePO.getFrontSurfaceEffective())
|
|
|
- .dynamics_airdragcoefficient(vehiclePO.getAirDragCoefficient())
|
|
|
- .dynamics_rollingresistance(vehiclePO.getRollingResistanceCoefficient())
|
|
|
- .dynamics_wheeldiameter(vehiclePO.getWheelDiameter())
|
|
|
- .dynamics_wheeldrive(vehiclePO.getWheelDrive())
|
|
|
- .dynamics_overallefficiency(vehiclePO.getOverallEfficiency())
|
|
|
- .dynamics_distfront(vehiclePO.getFrontDistance())
|
|
|
- .dynamics_distrear(vehiclePO.getRearDistance())
|
|
|
- .dynamics_distleft(vehiclePO.getLeftDistance())
|
|
|
- .dynamics_distright(vehiclePO.getRightDistance())
|
|
|
- .dynamics_distheight(vehiclePO.getHeightDistance())
|
|
|
- .dynamics_wheelbase(vehiclePO.getWheelbase())
|
|
|
+ .dynamics_maxspeed(vehicleEntity.getMaxSpeed())
|
|
|
+ .dynamics_enginepower(vehicleEntity.getEnginePower())
|
|
|
+ .dynamics_maxdecel(vehicleEntity.getMaxDeceleration())
|
|
|
+ .dynamics_maxsteering(vehicleEntity.getMaxSteeringAngle())
|
|
|
+ .dynamics_mass(vehicleEntity.getMass())
|
|
|
+ .dynamics_frontsurfaceeffective(vehicleEntity.getFrontSurfaceEffective())
|
|
|
+ .dynamics_airdragcoefficient(vehicleEntity.getAirDragCoefficient())
|
|
|
+ .dynamics_rollingresistance(vehicleEntity.getRollingResistanceCoefficient())
|
|
|
+ .dynamics_wheeldiameter(vehicleEntity.getWheelDiameter())
|
|
|
+ .dynamics_wheeldrive(vehicleEntity.getWheelDrive())
|
|
|
+ .dynamics_overallefficiency(vehicleEntity.getOverallEfficiency())
|
|
|
+ .dynamics_distfront(vehicleEntity.getFrontDistance())
|
|
|
+ .dynamics_distrear(vehicleEntity.getRearDistance())
|
|
|
+ .dynamics_distleft(vehicleEntity.getLeftDistance())
|
|
|
+ .dynamics_distright(vehicleEntity.getRightDistance())
|
|
|
+ .dynamics_distheight(vehicleEntity.getHeightDistance())
|
|
|
+ .dynamics_wheelbase(vehicleEntity.getWheelbase())
|
|
|
.build())
|
|
|
- .sensors(SensorsTO.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
- .camera(cameraPOList)
|
|
|
- .OGT(ogtPOList)
|
|
|
+ .sensors(SensorsEntity.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
+ .camera(cameraEntityList)
|
|
|
+ .OGT(ogtEntityList)
|
|
|
.build())
|
|
|
.build())
|
|
|
.build();
|
|
|
- FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskTO), projectPath + taskId + ".json");
|
|
|
- log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskTO.getInfo().getTask_id());
|
|
|
+ FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
|
|
|
+ log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
|
|
|
}
|
|
|
}
|
|
|
taskManager.batchInsertTask(taskList);
|
|
@@ -260,14 +259,14 @@ public class ProjectConsumer {
|
|
|
} else if ("2".equals(modelType)) {
|
|
|
log.info("项目 " + projectId + " 开始查询模型。");
|
|
|
|
|
|
- VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
- List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
- List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
+ VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
+ List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
+ List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
// -------------------------------- 4 保存任务消息 --------------------------------
|
|
|
log.info("项目 " + projectId + " 开始保存任务消息。");
|
|
|
- List<TaskPO> taskList = new ArrayList<>();
|
|
|
- for (ScenePO scenePO : scenePOSet) {
|
|
|
- String sceneId = scenePO.getId();
|
|
|
+ List<TaskEntity> taskList = new ArrayList<>();
|
|
|
+ for (SceneEntity sceneEntity : sceneEntitySet) {
|
|
|
+ String sceneId = sceneEntity.getId();
|
|
|
//3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
|
|
|
List<String> lastTargetIdList = null;
|
|
|
if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
@@ -281,25 +280,25 @@ public class ProjectConsumer {
|
|
|
for (String lastTargetId : lastTargetIdList) {
|
|
|
String taskId = StringUtil.getRandomUUID();
|
|
|
// 保存任务信息
|
|
|
- TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
|
|
|
+ TaskEntity taskEntity = TaskEntity.builder() // run_start_time 和 run_end_time 不填
|
|
|
.id(taskId)
|
|
|
.pId(projectId)
|
|
|
.sceneId(sceneId)
|
|
|
.lastTargetId(lastTargetId)
|
|
|
- .sceneName(scenePO.getName())
|
|
|
- .sceneType(scenePO.getType())
|
|
|
+ .sceneName(sceneEntity.getName())
|
|
|
+ .sceneType(sceneEntity.getType())
|
|
|
.runState(DictConstants.TASK_PENDING)
|
|
|
.runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId)
|
|
|
.build();
|
|
|
- taskPO.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
- taskPO.setCreateUserId(userId);
|
|
|
- taskPO.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
- taskPO.setModifyUserId(userId);
|
|
|
- taskPO.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
- taskPO.setIsDeleted("0");
|
|
|
- taskList.add(taskPO);
|
|
|
+ taskEntity.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
+ taskEntity.setCreateUserId(userId);
|
|
|
+ taskEntity.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
+ taskEntity.setModifyUserId(userId);
|
|
|
+ taskEntity.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
+ taskEntity.setIsDeleted("0");
|
|
|
+ taskList.add(taskEntity);
|
|
|
// 将 xosc、xodr、osgb 全部上传到仿真结果路径
|
|
|
- String scenarioOsc = scenePO.getScenarioOsc();
|
|
|
+ String scenarioOsc = sceneEntity.getScenarioOsc();
|
|
|
String[] splitXosc = scenarioOsc.split("/");
|
|
|
String xoscName = splitXosc[splitXosc.length - 1];
|
|
|
String[] xoscNameSplit = xoscName.split("\\.");
|
|
@@ -309,7 +308,7 @@ public class ProjectConsumer {
|
|
|
MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
|
|
|
MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
|
|
|
|
|
|
- String scenarioOdr = scenePO.getScenarioOdr();
|
|
|
+ String scenarioOdr = sceneEntity.getScenarioOdr();
|
|
|
String[] splitXodr = scenarioOdr.split("/");
|
|
|
String xodrName = splitXodr[splitXodr.length - 1];
|
|
|
String[] xodrNameSplit = xodrName.split("\\.");
|
|
@@ -319,7 +318,7 @@ public class ProjectConsumer {
|
|
|
MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
|
|
|
MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
|
|
|
|
|
|
- String scenarioOsgb = scenePO.getScenarioOsgb();
|
|
|
+ String scenarioOsgb = sceneEntity.getScenarioOsgb();
|
|
|
String[] splitOsgb = scenarioOsgb.split("/");
|
|
|
String osgbName = splitOsgb[splitOsgb.length - 1];
|
|
|
String[] osgbNameSplit = osgbName.split("\\.");
|
|
@@ -332,32 +331,32 @@ public class ProjectConsumer {
|
|
|
|
|
|
// 组装 task 消息
|
|
|
// carsim 不需要查询模型参数
|
|
|
- TaskTO taskTO = TaskTO.builder()
|
|
|
- .info(InfoTO.builder()
|
|
|
- .project_id(taskPO.getPId())
|
|
|
- .task_id(taskPO.getId())
|
|
|
- .task_path(taskPO.getRunResultFilePath())
|
|
|
+ TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder()
|
|
|
+ .info(InfoEntity.builder()
|
|
|
+ .project_id(taskEntity.getPId())
|
|
|
+ .task_id(taskEntity.getId())
|
|
|
+ .task_path(taskEntity.getRunResultFilePath())
|
|
|
.default_time(videoTime)
|
|
|
.build())
|
|
|
- .scenario(ScenarioTO.builder()
|
|
|
+ .scenario(ScenarioEntity.builder()
|
|
|
.scenario_osc(xoscPathOfMinio)
|
|
|
.scenario_odr(xodrPathOfMinio)
|
|
|
.scenario_osgb(osgbPathOfMinio)
|
|
|
.build())
|
|
|
.vehicle(VehicleTO.builder()
|
|
|
- .model(ModelTO.builder()
|
|
|
- .model_label(vehiclePO.getModelLabel())
|
|
|
+ .model(ModelEntity.builder()
|
|
|
+ .model_label(vehicleEntity.getModelLabel())
|
|
|
.build())
|
|
|
.dynamics(null)
|
|
|
- .sensors(SensorsTO.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
- .camera(cameraPOList)
|
|
|
- .OGT(ogtPOList)
|
|
|
+ .sensors(SensorsEntity.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
+ .camera(cameraEntityList)
|
|
|
+ .OGT(ogtEntityList)
|
|
|
.build())
|
|
|
.build())
|
|
|
.build();
|
|
|
|
|
|
- FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskTO), projectPath + taskId + ".json");
|
|
|
- log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskTO.getInfo().getTask_id());
|
|
|
+ FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
|
|
|
+ log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
|
|
|
}
|
|
|
}
|
|
|
taskManager.batchInsertTask(taskList);
|
|
@@ -399,37 +398,37 @@ public class ProjectConsumer {
|
|
|
return;
|
|
|
}
|
|
|
//3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
|
- UserPO userPO = userMapper.selectById(userId);
|
|
|
- log.info("cacheManualProject() 项目 " + projectId + " 的创建人为:" + userPO);
|
|
|
- String roleCode = userPO.getRoleCode();
|
|
|
- String useType = userPO.getUseType();
|
|
|
- ClusterPO clusterPO;
|
|
|
+ UserEntity userEntity = userMapper.selectById(userId);
|
|
|
+ log.info("cacheManualProject() 项目 " + projectId + " 的创建人为:" + userEntity);
|
|
|
+ String roleCode = userEntity.getRoleCode();
|
|
|
+ String useType = userEntity.getUseType();
|
|
|
+ ClusterEntity clusterEntity;
|
|
|
if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
|
log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
|
- PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
+ PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
return;
|
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
|
- clusterPO = clusterMapper.selectByUserId(userId);
|
|
|
- log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterPO);
|
|
|
+ clusterEntity = clusterMapper.selectByUserId(userId);
|
|
|
+ log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterEntity);
|
|
|
} else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
|
|
|
if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) { //3-3 普通子账户,根据自己的独占节点排队
|
|
|
- clusterPO = clusterMapper.selectByUserId(userId);
|
|
|
- log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通独占子账户(自己的集群),集群为:" + clusterPO);
|
|
|
+ clusterEntity = clusterMapper.selectByUserId(userId);
|
|
|
+ log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通独占子账户(自己的集群),集群为:" + clusterEntity);
|
|
|
} else { //3-4 共享子账户,根据父账户的共享节点排队
|
|
|
- String parentUserId = userPO.getCreateUserId();
|
|
|
- clusterPO = clusterMapper.selectByUserId(parentUserId);
|
|
|
- log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通共享子账户(父账户的集群),集群为:" + clusterPO);
|
|
|
+ String parentUserId = userEntity.getCreateUserId();
|
|
|
+ clusterEntity = clusterMapper.selectByUserId(parentUserId);
|
|
|
+ log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通共享子账户(父账户的集群),集群为:" + clusterEntity);
|
|
|
}
|
|
|
} else {
|
|
|
log.error("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
|
|
|
return;
|
|
|
}
|
|
|
// 获取拥有的节点数量,即仿真软件证书数量
|
|
|
- String clusterId = clusterPO.getId();
|
|
|
- int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
|
|
|
+ String clusterId = clusterEntity.getId();
|
|
|
+ int simulationLicenseNumber = clusterEntity.getNumSimulationLicense();
|
|
|
// 获取该集群中正在运行的项目,如果没有则立即执行
|
|
|
- PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
|
|
|
+ PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
|
|
|
// 获取正在运行的项目的并行度总和
|
|
|
int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
|
|
|
// 如果执行后的并行度总和小于最大节点数则执行,否则不执行
|
|
@@ -476,9 +475,9 @@ public class ProjectConsumer {
|
|
|
String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
|
String modelType = projectMessageDTO.getModelType();
|
|
|
String vehicleConfigId = projectMessageDTO.getVehicleConfigId();
|
|
|
- ProjectPO projectPO = projectUtil.getProjectByProjectId(projectId);
|
|
|
- log.info("项目 " + projectId + " 信息为:" + projectPO);
|
|
|
- String isChoiceGpu = projectPO.getIsChoiceGpu();
|
|
|
+ ProjectEntity projectEntity = projectUtil.getProjectByProjectId(projectId);
|
|
|
+ log.info("项目 " + projectId + " 信息为:" + projectEntity);
|
|
|
+ String isChoiceGpu = projectEntity.getIsChoiceGpu();
|
|
|
// 项目类型
|
|
|
int currentParallelism = projectMessageDTO.getCurrentParallelism(); // 当前并行度
|
|
|
|
|
@@ -514,7 +513,7 @@ public class ProjectConsumer {
|
|
|
//* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
|
|
|
String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
|
|
|
// -------------------------------- 4 发送任务消息 --------------------------------
|
|
|
- List<NodeTO> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
|
|
|
+ List<NodeEntity> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
|
|
|
int messageNumber = 0;
|
|
|
ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1); // 创建主题
|
|
|
TimeUnit.SECONDS.sleep(7);
|
|
@@ -537,24 +536,24 @@ public class ProjectConsumer {
|
|
|
//4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
|
|
|
// 选一个 count 最少的 node
|
|
|
String currentNodeName = "";
|
|
|
- NodeTO currentNodeTO = null;
|
|
|
+ NodeEntity currentNodeEntity = null;
|
|
|
int currentCount = Integer.MAX_VALUE;
|
|
|
log.info("各节点已经预定的任务个数为:" + nodeListToCount);
|
|
|
- for (NodeTO nodeTO : nodeListToCount) {
|
|
|
- int tempCount = nodeTO.getCount();
|
|
|
- String tempNodeName = nodeTO.getNodeName();
|
|
|
+ for (NodeEntity nodeEntity : nodeListToCount) {
|
|
|
+ int tempCount = nodeEntity.getCount();
|
|
|
+ String tempNodeName = nodeEntity.getNodeName();
|
|
|
if (tempCount < currentCount) {
|
|
|
currentCount = tempCount;
|
|
|
currentNodeName = tempNodeName;
|
|
|
- currentNodeTO = nodeTO;
|
|
|
+ currentNodeEntity = nodeEntity;
|
|
|
}
|
|
|
}
|
|
|
- if (currentNodeTO == null) {
|
|
|
+ if (currentNodeEntity == null) {
|
|
|
String errorMessage = "挑选节点失败。";
|
|
|
log.info(errorMessage);
|
|
|
throw new RuntimeException(errorMessage);
|
|
|
}
|
|
|
- currentNodeTO.setCount(currentNodeTO.getCount() + 1);
|
|
|
+ currentNodeEntity.setCount(currentNodeEntity.getCount() + 1);
|
|
|
Integer cpuOrder = null;
|
|
|
if (currentCount == 0) {
|
|
|
// 根据各节点剩余并行度,倒序获取 cpu 编号
|