|
@@ -133,14 +133,14 @@ public class ProjectConsumer {
|
|
String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
|
|
String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
|
|
log.info("项目 " + projectId + " 算法已导入:" + algorithmDockerImage);
|
|
log.info("项目 " + projectId + " 算法已导入:" + algorithmDockerImage);
|
|
// -------------------------------- 3 查询模型 --------------------------------
|
|
// -------------------------------- 3 查询模型 --------------------------------
|
|
- if ("1".equals(modelType)) {
|
|
|
|
- log.info("项目 " + projectId + " 开始查询模型。");
|
|
|
|
|
|
+ if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
|
|
|
|
+ log.debug("项目 " + projectId + " 开始查询模型。");
|
|
//2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
//2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
// -------------------------------- 4 保存任务消息 --------------------------------
|
|
// -------------------------------- 4 保存任务消息 --------------------------------
|
|
- log.info("项目 " + projectId + " 开始保存任务消息。");
|
|
|
|
|
|
+ log.debug("项目 " + projectId + " 开始保存任务消息。");
|
|
List<TaskEntity> taskList = new ArrayList<>();
|
|
List<TaskEntity> taskList = new ArrayList<>();
|
|
for (SceneEntity sceneEntity : sceneEntitySet) {
|
|
for (SceneEntity sceneEntity : sceneEntitySet) {
|
|
String sceneId = sceneEntity.getId();
|
|
String sceneId = sceneEntity.getId();
|
|
@@ -207,14 +207,13 @@ public class ProjectConsumer {
|
|
}
|
|
}
|
|
taskUtil.batchInsertTask(taskList);
|
|
taskUtil.batchInsertTask(taskList);
|
|
log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
|
|
log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
|
|
- } else if ("2".equals(modelType)) {
|
|
|
|
- log.info("项目 " + projectId + " 开始查询模型。");
|
|
|
|
-
|
|
|
|
|
|
+ } else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
|
|
|
|
+ log.debug("项目 " + projectId + " 开始查询模型。");
|
|
VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
// -------------------------------- 4 保存任务消息 --------------------------------
|
|
// -------------------------------- 4 保存任务消息 --------------------------------
|
|
- log.info("项目 " + projectId + " 开始保存任务消息。");
|
|
|
|
|
|
+ log.debug("项目 " + projectId + " 开始保存任务消息。");
|
|
List<TaskEntity> taskList = new ArrayList<>();
|
|
List<TaskEntity> taskList = new ArrayList<>();
|
|
for (SceneEntity sceneEntity : sceneEntitySet) {
|
|
for (SceneEntity sceneEntity : sceneEntitySet) {
|
|
String sceneId = sceneEntity.getId();
|
|
String sceneId = sceneEntity.getId();
|
|
@@ -317,12 +316,7 @@ public class ProjectConsumer {
|
|
} else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
} else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
} else {
|
|
} else {
|
|
- log.error("项目类型错误:" + projectMessageDTO);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- if (StringUtil.isEmpty(userId)) {
|
|
|
|
- log.error("未查询到项目创建人:" + projectMessageDTO);
|
|
|
|
- return;
|
|
|
|
|
|
+ throw new RuntimeException("未知项目类型:" + projectType);
|
|
}
|
|
}
|
|
//3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
//3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
UserEntity userEntity = userMapper.selectById(userId);
|
|
UserEntity userEntity = userMapper.selectById(userId);
|
|
@@ -333,7 +327,7 @@ public class ProjectConsumer {
|
|
if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
log.info("项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
log.info("项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
- run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
|
|
|
+ run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, modelType, DictConstants.SYSTEM_USER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
return;
|
|
return;
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
clusterEntity = clusterMapper.selectByUserId(userId);
|
|
clusterEntity = clusterMapper.selectByUserId(userId);
|
|
@@ -348,22 +342,34 @@ public class ProjectConsumer {
|
|
log.info("项目 " + projectId + " 的创建人 " + userId + " 为普通共享子账户(父账户的集群),集群为:" + clusterEntity);
|
|
log.info("项目 " + projectId + " 的创建人 " + userId + " 为普通共享子账户(父账户的集群),集群为:" + clusterEntity);
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- log.error("项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
|
|
|
|
- return;
|
|
|
|
|
|
+ throw new RuntimeException("未知角色类型:" + roleCode);
|
|
}
|
|
}
|
|
- // 获取拥有的节点数量,即仿真软件证书数量
|
|
|
|
- String clusterId = clusterEntity.getId();
|
|
|
|
- int simulationLicenseNumber = clusterEntity.getNumSimulationLicense();
|
|
|
|
- // 获取该集群中正在运行的项目,如果没有则立即执行
|
|
|
|
- PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
|
|
|
|
- // 获取正在运行的项目的并行度总和
|
|
|
|
- int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
|
|
|
|
- // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
|
|
|
|
- if (currentParallelismSum + parallelism <= simulationLicenseNumber) {
|
|
|
|
- run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
|
|
|
+ PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterEntity.getId(), projectId);
|
|
|
|
+ if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
|
|
|
|
+ // 获取仿真软件证书数量和动力学软件证书数量(vtd占一个仿真证书,carsim各占一个)
|
|
|
|
+// // 获取正在运行的项目的并行度总和
|
|
|
|
+// int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
|
|
|
|
+ // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
|
|
|
|
+ if (projectUtil.getUsingLicenseNumber(userId, DictConstants.MODEL_TYPE_VTD) + parallelism <= clusterEntity.getNumSimulationLicense()) {
|
|
|
|
+ run(projectMessageDTO, userId, modelType, clusterEntity.getId(), redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
|
+ } else {
|
|
|
|
+ log.info("项目 " + projectId + " 并行度超出账户允许,加入等待队列,暂不执行。 ");
|
|
|
|
+ wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
|
|
|
|
+ }
|
|
|
|
+ } else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
|
|
|
|
+ // 获取仿真软件证书数量和动力学软件证书数量(vtd占一个仿真证书,carsim各占一个)
|
|
|
|
+// // 获取正在运行的项目的并行度总和
|
|
|
|
+// int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
|
|
|
|
+ // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
|
|
|
|
+ if (projectUtil.getUsingLicenseNumber(userId, DictConstants.MODEL_TYPE_VTD) + parallelism <= clusterEntity.getNumSimulationLicense()
|
|
|
|
+ && projectUtil.getUsingLicenseNumber(userId, DictConstants.MODEL_TYPE_CARSIM) + parallelism <= clusterEntity.getNumDynamicLicense()) {
|
|
|
|
+ run(projectMessageDTO, userId, modelType, clusterEntity.getId(), redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
|
+ } else {
|
|
|
|
+ log.info("项目 " + projectId + " 并行度超出账户允许,加入等待队列,暂不执行。 ");
|
|
|
|
+ wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
|
|
|
|
+ }
|
|
} else {
|
|
} else {
|
|
- log.info("项目 " + projectId + " 并行度超出账户允许,加入等待队列,暂不执行。 ");
|
|
|
|
- wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
|
|
|
|
|
|
+ throw new RuntimeException("未知模型类型:" + modelType);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -371,11 +377,12 @@ public class ProjectConsumer {
|
|
|
|
|
|
/**
|
|
/**
|
|
* @param projectMessageDTO 初始接收到的项目启动信息
|
|
* @param projectMessageDTO 初始接收到的项目启动信息
|
|
- * @param clusterId 集群 id
|
|
|
|
|
|
+ * @param userId 用户ID
|
|
|
|
+ * @param clusterId 集群ID
|
|
* @param projectRunningKey projectRunningKey
|
|
* @param projectRunningKey projectRunningKey
|
|
* @param projectWaitingKey projectWaitingKey
|
|
* @param projectWaitingKey projectWaitingKey
|
|
*/
|
|
*/
|
|
- public void run(ProjectMessageDTO projectMessageDTO, String clusterId, String projectRunningKey, String projectWaitingKey) {
|
|
|
|
|
|
+ public void run(ProjectMessageDTO projectMessageDTO, String userId, String modelType, String clusterId, String projectRunningKey, String projectWaitingKey) {
|
|
|
|
|
|
String projectId = projectMessageDTO.getProjectId();
|
|
String projectId = projectMessageDTO.getProjectId();
|
|
int parallelism = projectMessageDTO.getParallelism(); // 期望并行度
|
|
int parallelism = projectMessageDTO.getParallelism(); // 期望并行度
|
|
@@ -384,6 +391,9 @@ public class ProjectConsumer {
|
|
//2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
|
|
//2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
|
|
if (restParallelism > 0L) {
|
|
if (restParallelism > 0L) {
|
|
log.info("集群 " + clusterId + " 执行项目 " + projectId);
|
|
log.info("集群 " + clusterId + " 执行项目 " + projectId);
|
|
|
|
+ if (!DictConstants.SYSTEM_USER_ID.equals(userId)) {
|
|
|
|
+ projectUtil.useLicense(userId, modelType, parallelism);
|
|
|
|
+ }
|
|
// 设置实际的并行度
|
|
// 设置实际的并行度
|
|
projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism)); // 设置实际的并行度
|
|
projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism)); // 设置实际的并行度
|
|
parseProject(projectMessageDTO, projectRunningKey);
|
|
parseProject(projectMessageDTO, projectRunningKey);
|
|
@@ -408,7 +418,7 @@ public class ProjectConsumer {
|
|
String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
String projectPath = linuxTempPath + "project/" + projectId + "/";
|
|
String projectPath = linuxTempPath + "project/" + projectId + "/";
|
|
// -------------------------------- 1 获取任务 json 列表 --------------------------------
|
|
// -------------------------------- 1 获取任务 json 列表 --------------------------------
|
|
- List<String> taskJsonList = FileUtil.listAbsolutePathByTypeAndLength(projectPath, "json", (StringUtil.getRandomUUID()+".json").length());
|
|
|
|
|
|
+ List<String> taskJsonList = FileUtil.listAbsolutePathByTypeAndLength(projectPath, "json", (StringUtil.getRandomUUID() + ".json").length());
|
|
int taskTotal = taskJsonList.size();
|
|
int taskTotal = taskJsonList.size();
|
|
projectMessageDTO.setTaskTotal(taskTotal);
|
|
projectMessageDTO.setTaskTotal(taskTotal);
|
|
projectMessageDTO.setTaskCompleted(0);
|
|
projectMessageDTO.setTaskCompleted(0);
|