|
@@ -1,20 +1,20 @@
|
|
|
-package com.css.simulation.resource.scheduler.service;
|
|
|
+package com.css.simulation.resource.scheduler.application.service;
|
|
|
|
|
|
import api.common.pojo.constants.DictConstants;
|
|
|
-import api.common.pojo.dto.ProjectMessageDTO;
|
|
|
import api.common.pojo.po.scheduler.SchedulerProjectPO;
|
|
|
import api.common.util.*;
|
|
|
-import com.css.simulation.resource.scheduler.configuration.custom.CustomConfiguration;
|
|
|
-import com.css.simulation.resource.scheduler.configuration.docker.DockerConfiguration;
|
|
|
-import com.css.simulation.resource.scheduler.configuration.git.GitConfiguration;
|
|
|
-import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
|
|
|
-import com.css.simulation.resource.scheduler.configuration.minio.MinioConfiguration;
|
|
|
-import com.css.simulation.resource.scheduler.configuration.redis.CustomRedisClient;
|
|
|
-import com.css.simulation.resource.scheduler.data.entity.*;
|
|
|
-import com.css.simulation.resource.scheduler.data.model.DynamicsModel;
|
|
|
-import com.css.simulation.resource.scheduler.data.model.VehicleModel;
|
|
|
-import com.css.simulation.resource.scheduler.mapper.*;
|
|
|
-import com.css.simulation.resource.scheduler.util.*;
|
|
|
+import com.css.simulation.resource.scheduler.api.model.ProjectMessageModel;
|
|
|
+import com.css.simulation.resource.scheduler.application.entity.VehicleEntity;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.configuration.custom.CustomConfiguration;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.configuration.docker.DockerConfiguration;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.configuration.entity.DynamicsEntity;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.configuration.git.GitConfiguration;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.configuration.kubernetes.KubernetesConfiguration;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.configuration.minio.MinioConfiguration;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.entity.*;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.persistence.mysql.mapper.*;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.persistence.redis.CustomRedisClient;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.util.*;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import io.minio.MinioClient;
|
|
|
import lombok.SneakyThrows;
|
|
@@ -115,19 +115,19 @@ public class ProjectService {
|
|
|
/**
|
|
|
* 接收到运行信息立即复制一份数据作为据运行数
|
|
|
*
|
|
|
- * @param projectMessageDTO 项目启动消息
|
|
|
+ * @param projectMessageModel 项目启动消息
|
|
|
*/
|
|
|
- public void createTaskAndFixData(ProjectMessageDTO projectMessageDTO) {
|
|
|
+ public void createTaskAndFixData(ProjectMessageModel projectMessageModel) {
|
|
|
//* -------------------------------- 0 读取消息,创建临时目录 --------------------------------
|
|
|
- String projectId = projectMessageDTO.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
- String projectType = projectMessageDTO.getType(); // 项目类型
|
|
|
+ String projectId = projectMessageModel.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
+ String projectType = projectMessageModel.getType(); // 项目类型
|
|
|
String isChoiceGpu = projectUtil.getIsChoiceGpuByProjectId(projectId);
|
|
|
try {
|
|
|
- String modelType = projectMessageDTO.getModelType(); // 模型类型,1 动力学模型 2 carsim模型
|
|
|
- String packageId = projectMessageDTO.getScenePackageId(); // 场景测试包 id
|
|
|
- String vehicleConfigId = projectMessageDTO.getVehicleConfigId(); // 模型配置 id
|
|
|
- String algorithmId = projectMessageDTO.getAlgorithmId(); // 模型配置 id
|
|
|
- long videoTime = projectMessageDTO.getMaxSimulationTime(); // 结果视频的时长
|
|
|
+ String modelType = projectMessageModel.getModelType(); // 模型类型,1 动力学模型 2 carsim模型
|
|
|
+ String packageId = projectMessageModel.getScenePackageId(); // 场景测试包 id
|
|
|
+ String vehicleConfigId = projectMessageModel.getVehicleConfigId(); // 模型配置 id
|
|
|
+ String algorithmId = projectMessageModel.getAlgorithmId(); // 模型配置 id
|
|
|
+ long videoTime = projectMessageModel.getMaxSimulationTime(); // 结果视频的时长
|
|
|
String userId = projectUtil.getUserIdByProjectIdAndProjectType(projectId, projectType);
|
|
|
String projectPath = linuxTempPath + "project/" + projectId + "/";
|
|
|
FileUtil.mkdir(projectPath);
|
|
@@ -140,24 +140,24 @@ public class ProjectService {
|
|
|
//根据场景测试包 packageId,拿到场景集合(包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
|
|
|
List<SceneEntity> sceneEntityList = getSceneList(projectId, packageId);
|
|
|
int taskTotal = sceneEntityList.size();
|
|
|
- projectMessageDTO.setTaskTotal(taskTotal);
|
|
|
- projectMessageDTO.setTaskCompleted(0);
|
|
|
+ projectMessageModel.setTaskTotal(taskTotal);
|
|
|
+ projectMessageModel.setTaskCompleted(0);
|
|
|
//去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
|
|
|
Set<SceneEntity> sceneEntitySet = new HashSet<>(sceneEntityList);
|
|
|
- log.info("项目 " + projectId + " 场景包括:" + sceneEntitySet);
|
|
|
+ log.debug("项目 " + projectId + " 场景包括:" + sceneEntitySet);
|
|
|
// -------------------------------- 2 算法导入 --------------------------------
|
|
|
log.info("项目 " + projectId + " 开始算法导入。");
|
|
|
String algorithmDockerImage = handleAlgorithm(projectId, algorithmId);
|
|
|
log.info("项目 " + projectId + " 算法已导入:" + algorithmDockerImage);
|
|
|
// -------------------------------- 3 查询模型 --------------------------------
|
|
|
+ log.debug("项目 " + projectId + " 开始查询模型。");
|
|
|
+ //1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
|
+ com.css.simulation.resource.scheduler.infrastructure.entity.VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
+ List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
+ List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
+ log.debug("项目 " + projectId + " 开始保存任务消息。");
|
|
|
if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
|
|
|
- log.debug("项目 " + projectId + " 开始查询模型。");
|
|
|
- //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
|
- VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
- List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
- List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
// -------------------------------- 4 保存任务消息 --------------------------------
|
|
|
- log.debug("项目 " + projectId + " 开始保存任务消息。");
|
|
|
List<TaskEntity> taskList = new ArrayList<>();
|
|
|
for (SceneEntity sceneEntity : sceneEntitySet) {
|
|
|
String sceneId = sceneEntity.getId();
|
|
@@ -218,21 +218,16 @@ public class ProjectService {
|
|
|
FileUtil.rm(osgbPathOfLinux); // 删除临时文件
|
|
|
|
|
|
// 组装 task 消息
|
|
|
- TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleModel.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(DynamicsModel.builder().dynamics_maxspeed(vehicleEntity.getMaxSpeed()).dynamics_enginepower(vehicleEntity.getEnginePower()).dynamics_maxdecel(vehicleEntity.getMaxDeceleration()).dynamics_maxsteering(vehicleEntity.getMaxSteeringAngle()).dynamics_mass(vehicleEntity.getMass()).dynamics_frontsurfaceeffective(vehicleEntity.getFrontSurfaceEffective()).dynamics_airdragcoefficient(vehicleEntity.getAirDragCoefficient()).dynamics_rollingresistance(vehicleEntity.getRollingResistanceCoefficient()).dynamics_wheeldiameter(vehicleEntity.getWheelDiameter()).dynamics_wheeldrive(vehicleEntity.getWheelDrive()).dynamics_overallefficiency(vehicleEntity.getOverallEfficiency()).dynamics_distfront(vehicleEntity.getFrontDistance()).dynamics_distrear(vehicleEntity.getRearDistance()).dynamics_distleft(vehicleEntity.getLeftDistance()).dynamics_distright(vehicleEntity.getRightDistance()).dynamics_distheight(vehicleEntity.getHeightDistance()).dynamics_wheelbase(vehicleEntity.getWheelbase()).build()).sensors(SensorsEntity.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
+ TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleEntity.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(DynamicsEntity.builder().dynamics_maxspeed(vehicleEntity.getMaxSpeed()).dynamics_enginepower(vehicleEntity.getEnginePower()).dynamics_maxdecel(vehicleEntity.getMaxDeceleration()).dynamics_maxsteering(vehicleEntity.getMaxSteeringAngle()).dynamics_mass(vehicleEntity.getMass()).dynamics_frontsurfaceeffective(vehicleEntity.getFrontSurfaceEffective()).dynamics_airdragcoefficient(vehicleEntity.getAirDragCoefficient()).dynamics_rollingresistance(vehicleEntity.getRollingResistanceCoefficient()).dynamics_wheeldiameter(vehicleEntity.getWheelDiameter()).dynamics_wheeldrive(vehicleEntity.getWheelDrive()).dynamics_overallefficiency(vehicleEntity.getOverallEfficiency()).dynamics_distfront(vehicleEntity.getFrontDistance()).dynamics_distrear(vehicleEntity.getRearDistance()).dynamics_distleft(vehicleEntity.getLeftDistance()).dynamics_distright(vehicleEntity.getRightDistance()).dynamics_distheight(vehicleEntity.getHeightDistance()).dynamics_wheelbase(vehicleEntity.getWheelbase()).build()).sensors(SensorsEntity.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
.camera(cameraEntityList).OGT(ogtEntityList).build()).build()).build();
|
|
|
FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
|
|
|
- log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
|
|
|
+ log.debug("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
|
|
|
}
|
|
|
}
|
|
|
taskUtil.batchInsertTask(taskList);
|
|
|
log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
|
|
|
} else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
|
|
|
- log.debug("项目 " + projectId + " 开始查询模型。");
|
|
|
- VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
- List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
- List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
- // -------------------------------- 4 保存任务消息 --------------------------------
|
|
|
- log.debug("项目 " + projectId + " 开始保存任务消息。");
|
|
|
+
|
|
|
List<TaskEntity> taskList = new ArrayList<>();
|
|
|
for (SceneEntity sceneEntity : sceneEntitySet) {
|
|
|
String sceneId = sceneEntity.getId();
|
|
@@ -292,11 +287,11 @@ public class ProjectService {
|
|
|
|
|
|
// 组装 task 消息
|
|
|
// carsim 不需要查询模型参数
|
|
|
- TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleModel.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(null).sensors(SensorsEntity.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
+ TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleEntity.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(null).sensors(SensorsEntity.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
.camera(cameraEntityList).OGT(ogtEntityList).build()).build()).build();
|
|
|
|
|
|
FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
|
|
|
- log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
|
|
|
+ log.debug("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
|
|
|
}
|
|
|
}
|
|
|
taskUtil.batchInsertTask(taskList);
|
|
@@ -304,7 +299,7 @@ public class ProjectService {
|
|
|
}
|
|
|
|
|
|
//* -------------------------------- 4 开始排队 --------------------------------
|
|
|
- cacheProject(projectMessageDTO);
|
|
|
+ cacheProject(projectMessageModel);
|
|
|
} catch (Exception e) {
|
|
|
log.error("项目报错。", e);
|
|
|
stopProject(isChoiceGpu, projectId, projectType, e.getMessage());
|
|
@@ -317,16 +312,16 @@ public class ProjectService {
|
|
|
/**
|
|
|
* 任务运行前首先判断用户是否拥有可分配资源
|
|
|
*
|
|
|
- * @param projectMessageDTO 项目启动消息
|
|
|
+ * @param projectMessageModel 项目启动消息
|
|
|
*/
|
|
|
@SneakyThrows
|
|
|
- public void cacheProject(ProjectMessageDTO projectMessageDTO) {
|
|
|
- log.debug("判断用户是否拥有可分配资源:" + projectMessageDTO);
|
|
|
+ public void cacheProject(ProjectMessageModel projectMessageModel) {
|
|
|
+ log.debug("判断用户是否拥有可分配资源:" + projectMessageModel);
|
|
|
//1 读取 kafka 的 project 信息
|
|
|
- final String modelType = projectMessageDTO.getModelType();
|
|
|
- String projectId = projectMessageDTO.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
- long parallelism = projectMessageDTO.getParallelism(); // 项目并行度
|
|
|
- String projectType = projectMessageDTO.getType(); // 项目类型
|
|
|
+ final String modelType = projectMessageModel.getModelType();
|
|
|
+ String projectId = projectMessageModel.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
+ long parallelism = projectMessageModel.getParallelism(); // 项目并行度
|
|
|
+ String projectType = projectMessageModel.getType(); // 项目类型
|
|
|
//2 获取用户信息(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
|
final UserEntity userEntity = projectUtil.getUserEntityByProjectIdAndProjectType(projectId, projectType);
|
|
|
String projectUserId = userEntity.getId();
|
|
@@ -339,7 +334,7 @@ public class ProjectService {
|
|
|
clusterUserId = DictConstants.SYSTEM_CLUSTER_ID;
|
|
|
log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
|
PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
- run(projectMessageDTO, clusterUserId, modelType, DictConstants.SYSTEM_USER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
+ run(projectMessageModel, clusterUserId, modelType, DictConstants.SYSTEM_USER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
return;
|
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
|
clusterUserId = projectUserId;
|
|
@@ -367,40 +362,67 @@ public class ProjectService {
|
|
|
final Integer numSimulationLicense = clusterEntity.getNumSimulationLicense();
|
|
|
final Integer numDynamicLicense = clusterEntity.getNumDynamicLicense();
|
|
|
final String clusterId = clusterEntity.getId();
|
|
|
- //1 判断仿真证书是否够用,如果
|
|
|
+ final String projectWaitingKey = redisPrefix.getProjectWaitingKey();
|
|
|
+ final String projectRunningKey = redisPrefix.getProjectRunningKey();
|
|
|
+ //1 判断仿真证书是否够用,如果证书为0则将项目加入等待队列;如果证书小于并行度则加入扩充队列,并用现有证书执行;如果证书够用,直接执行。
|
|
|
+ final int remainderSimulationLicense = numSimulationLicense - usingSimulationLicenseNumber;
|
|
|
+ final int remainderDynamicLicense;
|
|
|
+
|
|
|
// 如果执行后的并行度总和小于最大节点数则执行,否则不执行
|
|
|
if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
|
|
|
- if (usingSimulationLicenseNumber + parallelism <= numSimulationLicense) {
|
|
|
- run(projectMessageDTO, clusterUserId, modelType, clusterEntity.getId(), redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
+ if (remainderSimulationLicense <= 0) {
|
|
|
+ wait(DictConstants.PROJECT_WAIT_TYPE_EXECUTE, projectWaitingKey, projectMessageModel);
|
|
|
+ } else if (remainderSimulationLicense < parallelism) {
|
|
|
+ wait(DictConstants.PROJECT_WAIT_TYPE_EXPAND, projectWaitingKey, projectMessageModel);
|
|
|
} else {
|
|
|
- log.info("VTD 项目 {} 并行度为 {},用户 {} 的集群 {} 的仿真证书总数量为 {},已使用数量为 {}。该项目加入等待队列。 ", projectId, parallelism, clusterUserId, clusterId, numSimulationLicense, usingSimulationLicenseNumber);
|
|
|
- wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
|
|
|
+ run(projectMessageModel, clusterUserId, modelType, clusterId, projectRunningKey, projectWaitingKey);
|
|
|
}
|
|
|
} else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
|
|
|
usingDynamicLicenseNumber = projectUtil.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_DYNAMIC);
|
|
|
- if (usingSimulationLicenseNumber + parallelism <= numSimulationLicense && usingDynamicLicenseNumber + parallelism <= numDynamicLicense) {
|
|
|
- run(projectMessageDTO, clusterUserId, modelType, clusterEntity.getId(), redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
+ remainderDynamicLicense = numDynamicLicense - usingDynamicLicenseNumber;
|
|
|
+ if (remainderSimulationLicense <= 0 || remainderDynamicLicense <= 0) {
|
|
|
+ wait(DictConstants.PROJECT_WAIT_TYPE_EXECUTE, projectWaitingKey, projectMessageModel);
|
|
|
+ } else if (remainderSimulationLicense < parallelism || remainderDynamicLicense < parallelism) {
|
|
|
+ wait(DictConstants.PROJECT_WAIT_TYPE_EXPAND, projectWaitingKey, projectMessageModel);
|
|
|
} else {
|
|
|
- log.info("CARSIM 项目 {} 并行度为 {},用户 {} 的集群 {} 的仿真证书总数量为 {},已使用数量为 {},动力学证书总数量为 {},已使用数量为 {}。该项目加入等待队列。 ", projectId, parallelism, clusterUserId, clusterId, numSimulationLicense, usingSimulationLicenseNumber, numDynamicLicense, usingDynamicLicenseNumber);
|
|
|
- wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
|
|
|
+ run(projectMessageModel, clusterUserId, modelType, clusterId, projectRunningKey, projectWaitingKey);
|
|
|
}
|
|
|
} else {
|
|
|
throw new RuntimeException("未知模型类型:" + modelType);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ //* -------------------------------- 等待 --------------------------------
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param waitType 等待类型 1等待执行 2等待扩充
|
|
|
+ * @param projectWaitingKey 项目等待 key
|
|
|
+ * @param projectMessageModel 项目信息
|
|
|
+ */
|
|
|
+ public void wait(String waitType, String projectWaitingKey, ProjectMessageModel projectMessageModel) {
|
|
|
+ //1 获取 redis 中的等待列表
|
|
|
+
|
|
|
+ if (DictConstants.PROJECT_WAIT_TYPE_EXECUTE.equals(waitType)) {
|
|
|
+
|
|
|
+ } else if (DictConstants.PROJECT_WAIT_TYPE_EXPAND.equals(waitType)) {
|
|
|
+
|
|
|
+ }
|
|
|
+ customRedisClient.set(projectWaitingKey, JsonUtil.beanToJson(projectMessageModel));
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
//* -------------------------------- 运行 --------------------------------
|
|
|
|
|
|
/**
|
|
|
- * @param projectMessageDTO 初始接收到的项目启动信息
|
|
|
+ * @param projectMessageModel 初始接收到的项目启动信息
|
|
|
* @param clusterId 集群ID
|
|
|
* @param projectRunningKey projectRunningKey
|
|
|
* @param projectWaitingKey projectWaitingKey
|
|
|
*/
|
|
|
- public void run(ProjectMessageDTO projectMessageDTO, String clusterUserId, String modelType, String clusterId, String projectRunningKey, String projectWaitingKey) {
|
|
|
- String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
|
+ public void run(ProjectMessageModel projectMessageModel, String clusterUserId, String modelType, String clusterId, String projectRunningKey, String projectWaitingKey) {
|
|
|
+ String projectId = projectMessageModel.getProjectId(); // 项目 id
|
|
|
String isChoiceGpu = projectUtil.getIsChoiceGpuByProjectId(projectId);
|
|
|
- int parallelism = projectMessageDTO.getParallelism(); // 期望并行度
|
|
|
+ int parallelism = projectMessageModel.getParallelism(); // 期望并行度
|
|
|
//1 获取集群剩余可用并行度
|
|
|
int restParallelism = projectUtil.getRestParallelism(isChoiceGpu);
|
|
|
//2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
|
|
@@ -410,31 +432,33 @@ public class ProjectService {
|
|
|
projectUtil.useLicense(clusterUserId, modelType, parallelism);
|
|
|
}
|
|
|
// 设置实际的并行度
|
|
|
- projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism)); // 设置实际的并行度
|
|
|
- parseProject(projectMessageDTO, projectRunningKey, isChoiceGpu);
|
|
|
+ projectMessageModel.setCurrentParallelism(Math.min(restParallelism, parallelism)); // 设置实际的并行度
|
|
|
+ parseProject(projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
|
} else {
|
|
|
log.info("服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
|
|
|
- wait(projectWaitingKey, projectMessageDTO);
|
|
|
+// wait(projectWaitingKey, projectMessageDTO);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @param projectMessageDTO 初始接收到的项目启动信息
|
|
|
+ * 运行项目
|
|
|
+ *
|
|
|
+ * @param projectMessageModel 初始接收到的项目启动信息
|
|
|
* @param projectRunningKey projectRunningKey
|
|
|
*/
|
|
|
@SneakyThrows
|
|
|
- public void parseProject(ProjectMessageDTO projectMessageDTO, String projectRunningKey, String isChoiceGpu) {
|
|
|
- String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
|
- String modelType = projectMessageDTO.getModelType();
|
|
|
- String vehicleConfigId = projectMessageDTO.getVehicleConfigId();
|
|
|
- int currentParallelism = projectMessageDTO.getCurrentParallelism(); // 当前并行度
|
|
|
- String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
|
+ public void parseProject(ProjectMessageModel projectMessageModel, String projectRunningKey, String isChoiceGpu) {
|
|
|
+ String projectId = projectMessageModel.getProjectId(); // 项目 id
|
|
|
+ String modelType = projectMessageModel.getModelType();
|
|
|
+ String vehicleConfigId = projectMessageModel.getVehicleConfigId();
|
|
|
+ int currentParallelism = projectMessageModel.getCurrentParallelism(); // 当前并行度
|
|
|
+ String algorithmId = projectMessageModel.getAlgorithmId(); // 算法 id
|
|
|
String projectPath = linuxTempPath + "project/" + projectId + "/";
|
|
|
// -------------------------------- 1 获取任务 json 列表 --------------------------------
|
|
|
List<String> taskJsonList = FileUtil.listAbsolutePathByTypeAndLength(projectPath, "json", (StringUtil.getRandomUUID() + ".json").length());
|
|
|
int taskTotal = taskJsonList.size();
|
|
|
- projectMessageDTO.setTaskTotal(taskTotal);
|
|
|
- projectMessageDTO.setTaskCompleted(0);
|
|
|
+ projectMessageModel.setTaskTotal(taskTotal);
|
|
|
+ projectMessageModel.setTaskCompleted(0);
|
|
|
// 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
|
|
|
//1 获取剩余并行度和即将使用的各node的并行度
|
|
|
Map<String, Integer> nodeMap0 = projectUtil.getNodeMap(isChoiceGpu);
|
|
@@ -443,9 +467,9 @@ public class ProjectService {
|
|
|
nodeMap.keySet().forEach(nodeName -> projectUtil.decrementParallelism(isChoiceGpu, nodeName, nodeMap.get(nodeName)));
|
|
|
// 重新设置实际使用的并行度并保存到 redis
|
|
|
int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
|
|
|
- projectMessageDTO.setCurrentParallelism(realCurrentParallelism);
|
|
|
+ projectMessageModel.setCurrentParallelism(realCurrentParallelism);
|
|
|
log.info("项目 " + projectId + " 运行在:" + nodeMap);
|
|
|
- stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
|
|
|
+ stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageModel));
|
|
|
//* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
|
|
|
String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
|
|
|
// -------------------------------- 4 发送任务消息 --------------------------------
|
|
@@ -516,17 +540,6 @@ public class ProjectService {
|
|
|
}
|
|
|
|
|
|
|
|
|
- //* -------------------------------- 等待 --------------------------------
|
|
|
-
|
|
|
- /**
|
|
|
- * @param projectWaitingKey 项目等待 key
|
|
|
- * @param projectMessageDTO 项目信息
|
|
|
- */
|
|
|
- public void wait(String projectWaitingKey, ProjectMessageDTO projectMessageDTO) {
|
|
|
- stringRedisTemplate.opsForValue().set(projectWaitingKey, JsonUtil.beanToJson(projectMessageDTO));
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
//* -------------------------------- Comment --------------------------------
|
|
|
|
|
|
/**
|
|
@@ -639,12 +652,10 @@ public class ProjectService {
|
|
|
String leafIndexPrefix = "project:" + projectId + ":package:" + packageId + ":leaf";
|
|
|
//1 查询该场景包的所有指标列表,包删了无所谓,但要过滤删掉的指标。并保存成 json 文件。
|
|
|
List<IndexTemplateEntity> allIndexList = indexTemplateMapper.selectByPackageIdIncludeDeleted(packageId);
|
|
|
-// FileUtil.writeStringToLocalFile(JsonUtil.listToJson(allIndexList), projectPath + "all-index-list.json");
|
|
|
stringRedisTemplate.opsForValue().set(allIndexPrefix, JsonUtil.listToJson(allIndexList));
|
|
|
//2 查询场景包叶子指标
|
|
|
List<IndexTemplateEntity> leafIndexList = allIndexList.stream().filter(index -> StringUtil.isNotEmpty(index.getRuleId())).collect(Collectors.toList());
|
|
|
log.debug("项目 " + projectId + " 的叶子指标为:" + leafIndexList);
|
|
|
-// FileUtil.writeStringToLocalFile(JsonUtil.listToJson(allIndexList), projectPath + "leaf-index-list.json");
|
|
|
stringRedisTemplate.opsForValue().set(leafIndexPrefix, JsonUtil.listToJson(leafIndexList));
|
|
|
List<SceneEntity> sceneList = new ArrayList<>();
|
|
|
leafIndexList.forEach(leafIndex -> {
|
|
@@ -767,7 +778,7 @@ public class ProjectService {
|
|
|
autoSubProjectMapper.saveErrorMessage(SchedulerProjectPO.builder().id(projectId).errorMessage(em).modifyUserId(DictConstants.SCHEDULER_USER_ID).modifyTime(TimeUtil.getNowForMysql()).build());
|
|
|
}
|
|
|
});
|
|
|
- stopProject(isChoiceGpu, projectType, projectId);
|
|
|
+ stopProject( projectType, projectId);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -775,7 +786,8 @@ public class ProjectService {
|
|
|
* @param projectType 项目类型
|
|
|
*/
|
|
|
@SneakyThrows
|
|
|
- public void stopProject(String isChoiceGpu, String projectType, String projectId) {
|
|
|
+ public void stopProject( String projectType, String projectId) {
|
|
|
+ String isChoiceGpu = projectUtil.getIsChoiceGpuByProjectId(projectId);
|
|
|
//* -------------------------------- Comment --------------------------------
|
|
|
ProjectEntity projectEntity;
|
|
|
String projectUserId, clusterUserId, modelType, projectRunningKeyPrefix;
|