|
@@ -27,7 +27,7 @@ import java.util.Set;
|
|
|
|
|
|
@Component
|
|
@Component
|
|
@Slf4j
|
|
@Slf4j
|
|
-public class ManualProjectConsumer {
|
|
|
|
|
|
+public class ProjectConsumer {
|
|
|
|
|
|
@Autowired
|
|
@Autowired
|
|
KafkaTemplate<String, String> kafkaTemplate;
|
|
KafkaTemplate<String, String> kafkaTemplate;
|
|
@@ -88,68 +88,96 @@ public class ManualProjectConsumer {
|
|
@KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.topic}")
|
|
@KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.topic}")
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
public void cacheManualProject(ConsumerRecord<String, String> projectRecord) {
|
|
public void cacheManualProject(ConsumerRecord<String, String> projectRecord) {
|
|
- log.info("ManualProjectConsumer--cacheManualProject 接收到项目开始消息为:" + projectRecord);
|
|
|
|
String projectJson = projectRecord.value();
|
|
String projectJson = projectRecord.value();
|
|
|
|
+ log.info("ProjectConsumer--cacheManualProject 接收到项目开始消息为:" + projectJson);
|
|
//1 读取 kafka 的 project 信息
|
|
//1 读取 kafka 的 project 信息
|
|
ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
- String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
|
|
- String packageId = projectMessageDTO.getScenePackageId(); // 场景测试包 id
|
|
|
|
- int maxSimulationTime = projectMessageDTO.getMaxSimulationTime(); // 最大仿真时间,即生成视频的时间长度
|
|
|
|
- String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
|
|
|
|
- String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
|
|
- int parallelism = projectMessageDTO.getParallelism();
|
|
|
|
|
|
+ String projectId = projectMessageDTO.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
|
+ Long parallelism = projectMessageDTO.getParallelism(); // 项目并行度
|
|
|
|
+ String projectType = projectMessageDTO.getProjectType(); // 项目类型
|
|
//2 根据 projectId 获取创建用户 id
|
|
//2 根据 projectId 获取创建用户 id
|
|
- String userId = manualProjectMapper.selectCreateUserById(projectId);
|
|
|
|
|
|
+ String userId;
|
|
|
|
+ if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
|
|
+ userId = manualProjectMapper.selectCreateUserById(projectId);
|
|
|
|
+ } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
|
|
+ userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
|
|
+ } else {
|
|
|
|
+ log.error("ProjectConsumer--cacheManualProject 项目类型错误:" + projectJson);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ if (StringUtil.isEmpty(userId)) {
|
|
|
|
+ log.error("ProjectConsumer--cacheManualProject 未查询到项目创建人:" + projectJson);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
//3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
//3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
UserPO userPO = userMapper.selectById(userId);
|
|
UserPO userPO = userMapper.selectById(userId);
|
|
String roleCode = userPO.getRoleCode();
|
|
String roleCode = userPO.getRoleCode();
|
|
String useType = userPO.getUseType();
|
|
String useType = userPO.getUseType();
|
|
ClusterPO clusterPO;
|
|
ClusterPO clusterPO;
|
|
|
|
+ String clusterPrefix;
|
|
|
|
+ String clusterRunningPrefix;
|
|
|
|
+ String clusterWaitingPrefix;
|
|
|
|
+ String projectRunningKey;
|
|
|
|
+ String projectWaitingKey;
|
|
if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
- parseManualProject(projectJson);
|
|
|
|
|
|
+ clusterPrefix = "cluster:" + DictConstants.SYSTEM_CLUSTER_ID;
|
|
|
|
+ clusterRunningPrefix = clusterPrefix + ":running";
|
|
|
|
+ projectRunningKey = clusterRunningPrefix + ":" + projectId;
|
|
|
|
+ run(DictConstants.SYSTEM_CLUSTER_ID, projectId, projectRunningKey, projectJson);
|
|
return;
|
|
return;
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
- // 获取拥有的节点数量,即仿真软件证书数量
|
|
|
|
clusterPO = clusterMapper.selectByUserId(userId);
|
|
clusterPO = clusterMapper.selectByUserId(userId);
|
|
} else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
|
|
} else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
|
|
- if (DictConstants.USE_TYPE_EXCLUSIVE.equals(useType)) { //3-2 普通子账户,根据自己的独占节点排队
|
|
|
|
|
|
+ if (DictConstants.USE_TYPE_EXCLUSIVE.equals(useType)) { //3-3 普通子账户,根据自己的独占节点排队
|
|
clusterPO = clusterMapper.selectByUserId(userId);
|
|
clusterPO = clusterMapper.selectByUserId(userId);
|
|
- } else { // 共享子账户需要查询父账户的集群 id
|
|
|
|
|
|
+ } else { //3-4 共享子账户,根据父账户的共享节点排队
|
|
String parentUserId = userPO.getCreateUserId();
|
|
String parentUserId = userPO.getCreateUserId();
|
|
clusterPO = clusterMapper.selectByUserId(parentUserId);
|
|
clusterPO = clusterMapper.selectByUserId(parentUserId);
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- parseManualProject(projectJson);
|
|
|
|
|
|
+ log.error("ProjectConsumer--cacheManualProject 未知账户类型,不予执行:" + projectJson);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
// 获取拥有的节点数量,即仿真软件证书数量
|
|
// 获取拥有的节点数量,即仿真软件证书数量
|
|
String clusterId = clusterPO.getId();
|
|
String clusterId = clusterPO.getId();
|
|
int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
|
|
int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
|
|
- // 获取该集群中正在运行的项目
|
|
|
|
- Set<String> runningProjectSet = redisTemplate.keys(manualProjectTopic + ":cluster:" + clusterId + ":running" + "*");
|
|
|
|
- // 根据项目 json 获取每个项目的并行度
|
|
|
|
- if (CollectionUtil.isNotEmpty(runningProjectSet)) {
|
|
|
|
- long parallelismSum = 0;
|
|
|
|
- for (String projectKey : runningProjectSet) {
|
|
|
|
- String projectJsonTemp = redisTemplate.opsForValue().get(projectKey);
|
|
|
|
- ProjectMessageDTO projectMessageTemp = JsonUtil.jsonToBean(projectJsonTemp, ProjectMessageDTO.class);
|
|
|
|
- parallelismSum += projectMessageTemp.getParallelism();
|
|
|
|
- }
|
|
|
|
- if (parallelismSum + parallelism <= simulationLicenseNumber) {
|
|
|
|
- log.info("ManualProjectConsumer--cacheManualProject 集群 " + clusterId + " 将项目 " + projectId + " 执行!");
|
|
|
|
- redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":running:" + projectId, projectJson);
|
|
|
|
- parseManualProject(projectJson);
|
|
|
|
- } else {
|
|
|
|
- log.info("ManualProjectConsumer--cacheManualProject 集群 " + clusterId + " 将项目 " + projectId + " 放入等待队列!");
|
|
|
|
- redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":waiting:" + projectId, projectJson);
|
|
|
|
- }
|
|
|
|
|
|
+ // 获取该集群中正在运行的项目,如果没有则立即执行
|
|
|
|
+ clusterPrefix = "cluster:" + clusterId;
|
|
|
|
+ clusterRunningPrefix = clusterPrefix + ":running";
|
|
|
|
+ clusterWaitingPrefix = clusterPrefix + ":waiting";
|
|
|
|
+ projectRunningKey = clusterRunningPrefix + ":" + projectId;
|
|
|
|
+ projectWaitingKey = clusterWaitingPrefix + ":" + projectId;
|
|
|
|
+ Set<String> runningProjectSet = redisTemplate.keys(clusterRunningPrefix + "*");
|
|
|
|
+ if (CollectionUtil.isEmpty(runningProjectSet)) {
|
|
|
|
+ run(clusterId, projectId, projectRunningKey, projectJson);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ // 计算正在运行的项目的并行度总和
|
|
|
|
+ long parallelismSum = 0;
|
|
|
|
+ for (String projectKey : runningProjectSet) {
|
|
|
|
+ String projectJsonTemp = redisTemplate.opsForValue().get(projectKey);
|
|
|
|
+ ProjectMessageDTO projectMessageTemp = JsonUtil.jsonToBean(projectJsonTemp, ProjectMessageDTO.class);
|
|
|
|
+ parallelismSum += projectMessageTemp.getParallelism();
|
|
|
|
+ }
|
|
|
|
+ // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
|
|
|
|
+ if (parallelismSum + parallelism <= simulationLicenseNumber) {
|
|
|
|
+ run(clusterId, projectId, projectRunningKey, projectJson);
|
|
} else {
|
|
} else {
|
|
- log.info("ManualProjectConsumer--cacheManualProject 集群 " + clusterId + " 将项目 " + projectId + " 执行!");
|
|
|
|
- redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":running:" + projectId, projectJson);
|
|
|
|
- parseManualProject(projectJson);
|
|
|
|
|
|
+ wait(clusterId, projectId, projectWaitingKey, projectJson);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public void run(String clusterId, String projectId, String projectRunningKey, String projectJson) {
|
|
|
|
+ log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + " 执行!");
|
|
|
|
+ redisTemplate.opsForValue().set(projectRunningKey, projectJson);
|
|
|
|
+ parseManualProject(projectJson, "cluster:" + clusterId, projectRunningKey);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void wait(String clusterId, String projectId, String projectWaitingKey, String projectJson) {
|
|
|
|
+ log.info("ProjectConsumer--cacheManualProject 集群 " + clusterId + " 将项目 " + projectId + " 放入等待队列!");
|
|
|
|
+ redisTemplate.opsForValue().set(projectWaitingKey, projectJson);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
|
|
/**
|
|
/**
|
|
* 开始执行以及重新执行
|
|
* 开始执行以及重新执行
|
|
@@ -157,42 +185,42 @@ public class ManualProjectConsumer {
|
|
* @param projectJson 项目启动消息
|
|
* @param projectJson 项目启动消息
|
|
*/
|
|
*/
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
- public void parseManualProject(String projectJson) {
|
|
|
|
|
|
+ public void parseManualProject(String projectJson, String clusterPrefix, String projectRunningPrefix) {
|
|
|
|
|
|
// -------------------------------- 0 准备 --------------------------------
|
|
// -------------------------------- 0 准备 --------------------------------
|
|
- log.info("------- ManualProjectConsumer 接收到项目开始消息为:" + projectJson);
|
|
|
|
|
|
+ log.info("ProjectConsumer--parseManualProject 接收到项目开始消息为:" + projectJson);
|
|
//1 读取 kafka 的 project 信息
|
|
//1 读取 kafka 的 project 信息
|
|
ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
String packageId = projectMessageDTO.getScenePackageId(); // 场景测试包 id
|
|
String packageId = projectMessageDTO.getScenePackageId(); // 场景测试包 id
|
|
- int maxSimulationTime = projectMessageDTO.getMaxSimulationTime(); // 最大仿真时间,即生成视频的时间长度
|
|
|
|
|
|
+ Long maxSimulationTime = projectMessageDTO.getMaxSimulationTime(); // 最大仿真时间,即生成视频的时间长度
|
|
String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
|
|
String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
|
|
String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
String userId = manualProjectMapper.selectCreateUserById(projectId);
|
|
String userId = manualProjectMapper.selectCreateUserById(projectId);
|
|
- int parallelism = projectMessageDTO.getParallelism(); // 并行度
|
|
|
|
|
|
+ Long parallelism = projectMessageDTO.getParallelism(); // 并行度
|
|
//2 执行前准备,删除改项目下所有任务,即重新执行改项目时需要新的测试包
|
|
//2 执行前准备,删除改项目下所有任务,即重新执行改项目时需要新的测试包
|
|
- manualProjectService.prepare(manualProjectTopic, userId, projectId, projectJson);
|
|
|
|
|
|
+ manualProjectService.prepare(clusterPrefix, projectId, projectJson);
|
|
// -------------------------------- 1 查询场景 --------------------------------
|
|
// -------------------------------- 1 查询场景 --------------------------------
|
|
//1-1 根据场景测试包 packageId,拿到场景集合(不包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
|
|
//1-1 根据场景测试包 packageId,拿到场景集合(不包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
|
|
- List<ScenePO> scenePOList = manualProjectService.handlePackage(manualProjectTopic, userId, projectId, packageId);
|
|
|
|
- Set<ScenePO> scenePOSet = new HashSet<>(scenePOList);
|
|
|
|
|
|
+ List<ScenePO> scenePOList = manualProjectService.handlePackage(projectRunningPrefix, projectId, packageId);
|
|
|
|
+ Set<ScenePO> scenePOSet = new HashSet<>(scenePOList); // 如果不去重的话会出现多个场景重复关联多个指标
|
|
// -------------------------------- 2 查询模型 --------------------------------
|
|
// -------------------------------- 2 查询模型 --------------------------------
|
|
//2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
//2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
// -------------------------------- 3 发送任务消息 --------------------------------
|
|
// -------------------------------- 3 发送任务消息 --------------------------------
|
|
- manualProjectService.sendTaskMessage(manualProjectTopic, userId, projectId, maxSimulationTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
|
|
|
|
|
|
+ manualProjectService.sendTaskMessage(projectRunningPrefix, userId, projectId, maxSimulationTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
|
|
// -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
|
|
// -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
|
|
String algorithmDockerImage = manualProjectService.handleAlgorithm(projectId, algorithmId);
|
|
String algorithmDockerImage = manualProjectService.handleAlgorithm(projectId, algorithmId);
|
|
// -------------------------------- 5 创建 pod 开始执行 --------------------------------
|
|
// -------------------------------- 5 创建 pod 开始执行 --------------------------------
|
|
int completions = scenePOList.size(); // 结束标
|
|
int completions = scenePOList.size(); // 结束标
|
|
- log.info("------- ManualProjectConsumer 项目 " + projectId + " 的完成度为:" + completions);
|
|
|
|
- log.info("------- ManualProjectConsumer 项目 " + projectId + " 的并行度为:" + parallelism);
|
|
|
|
|
|
+ log.info("ProjectConsumer--parseManualProject 项目 " + projectId + " 的完成度为:" + completions);
|
|
|
|
+ log.info("ProjectConsumer--parseManualProject 项目 " + projectId + " 的并行度为:" + parallelism);
|
|
String jobTemplateYamlPathSource = jobTemplate + "job-template.yaml";
|
|
String jobTemplateYamlPathSource = jobTemplate + "job-template.yaml";
|
|
String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
|
|
String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
|
|
String yamlSource = FileUtil.read(jobTemplateYamlPathSource);
|
|
String yamlSource = FileUtil.read(jobTemplateYamlPathSource);
|
|
- log.info("------- ManualProjectConsumer 模板文件为:" + yamlSource);
|
|
|
|
|
|
+ log.info("ProjectConsumer--parseManualProject 模板文件为:" + yamlSource);
|
|
String replace0 = yamlSource.replace("job-cloud-simulation", "project-" + projectId);
|
|
String replace0 = yamlSource.replace("job-cloud-simulation", "project-" + projectId);
|
|
String replace1 = replace0.replace("vtd-container", "vtd-" + projectId);
|
|
String replace1 = replace0.replace("vtd-container", "vtd-" + projectId);
|
|
String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
|
|
String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
|
|
@@ -202,7 +230,7 @@ public class ManualProjectConsumer {
|
|
String replace6 = replace5.replace("parallelism-number", parallelism + "");
|
|
String replace6 = replace5.replace("parallelism-number", parallelism + "");
|
|
String replace7 = replace6.replace("apiVers1on", "apiVersion");
|
|
String replace7 = replace6.replace("apiVers1on", "apiVersion");
|
|
String replace8 = replace7.replace("1atch/v1", "batch/v1");
|
|
String replace8 = replace7.replace("1atch/v1", "batch/v1");
|
|
- log.info("------- ManualProjectConsumer 开始执行 yaml 文件" + replace8);
|
|
|
|
|
|
+ log.info("ProjectConsumer--parseManualProject 开始执行 yaml 文件" + replace8);
|
|
FileUtil.writeStringToLocalFile(replace8, jobTemplateYamlPathTarget);
|
|
FileUtil.writeStringToLocalFile(replace8, jobTemplateYamlPathTarget);
|
|
|
|
|
|
|
|
|
|
@@ -216,7 +244,7 @@ public class ManualProjectConsumer {
|
|
@KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.stop-topic}")
|
|
@KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.stop-topic}")
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
public void stopManualProject(ConsumerRecord<String, String> stopRecord) {
|
|
public void stopManualProject(ConsumerRecord<String, String> stopRecord) {
|
|
- log.info("ManualProjectConsumer--stopManualProject 接收到的项目终止消息为:" + stopRecord);
|
|
|
|
|
|
+ log.info("ProjectConsumer--stopManualProject 接收到的项目终止消息为:" + stopRecord);
|
|
//1 读取 kafka 的项目停止信息
|
|
//1 读取 kafka 的项目停止信息
|
|
/*
|
|
/*
|
|
{
|
|
{
|