|
@@ -8,7 +8,6 @@ import api.common.util.JsonUtil;
|
|
|
import api.common.util.StringUtil;
|
|
|
import com.css.simulation.resource.scheduler.mapper.*;
|
|
|
import com.css.simulation.resource.scheduler.pojo.po.*;
|
|
|
-import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
|
|
|
import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
|
|
|
import com.css.simulation.resource.scheduler.service.ProjectService;
|
|
|
import com.css.simulation.resource.scheduler.util.ProjectUtil;
|
|
@@ -25,6 +24,7 @@ import org.springframework.stereotype.Component;
|
|
|
import javax.annotation.Resource;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
|
|
|
@Component
|
|
@@ -33,10 +33,7 @@ public class ProjectConsumer {
|
|
|
|
|
|
@Value("${scheduler.minio-path.project-result}")
|
|
|
String resultPathMinio;
|
|
|
- @Value("${scheduler.linux-path.job-template}")
|
|
|
- String jobTemplate;
|
|
|
- @Value("${scheduler.linux-path.job-yaml}")
|
|
|
- String jobYaml;
|
|
|
+
|
|
|
|
|
|
@Value("${scheduler.host.hostname}")
|
|
|
String hostname;
|
|
@@ -77,10 +74,10 @@ public class ProjectConsumer {
|
|
|
@KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.start-topic}")
|
|
|
@SneakyThrows
|
|
|
public void cacheProject(ConsumerRecord<String, String> projectRecord) {
|
|
|
- String projectJson = projectRecord.value();
|
|
|
- log.info("ProjectConsumer--cacheManualProject 接收到项目开始消息为:" + projectJson);
|
|
|
+ String initialProjectJson = projectRecord.value();
|
|
|
+ log.info("ProjectConsumer--cacheManualProject 接收到项目开始消息为:" + initialProjectJson);
|
|
|
//1 读取 kafka 的 project 信息
|
|
|
- ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
|
+ ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(initialProjectJson, ProjectMessageDTO.class);
|
|
|
String projectId = projectMessageDTO.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
long parallelism = projectMessageDTO.getParallelism(); // 项目并行度
|
|
|
String projectType = projectMessageDTO.getType(); // 项目类型
|
|
@@ -91,11 +88,11 @@ public class ProjectConsumer {
|
|
|
} else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
|
userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
|
} else {
|
|
|
- log.error("ProjectConsumer--cacheManualProject 项目类型错误:" + projectJson);
|
|
|
+ log.error("ProjectConsumer--cacheManualProject 项目类型错误:" + initialProjectJson);
|
|
|
return;
|
|
|
}
|
|
|
if (StringUtil.isEmpty(userId)) {
|
|
|
- log.error("ProjectConsumer--cacheManualProject 未查询到项目创建人:" + projectJson);
|
|
|
+ log.error("ProjectConsumer--cacheManualProject 未查询到项目创建人:" + initialProjectJson);
|
|
|
return;
|
|
|
}
|
|
|
//3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
@@ -107,7 +104,7 @@ public class ProjectConsumer {
|
|
|
if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
|
log.info("ProjectConsumer--cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
|
PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
- run(DictConstants.SYSTEM_CLUSTER_ID, projectId, projectType, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey(), projectJson, parallelism);
|
|
|
+ run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
return;
|
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
|
clusterPO = clusterMapper.selectByUserId(userId);
|
|
@@ -133,12 +130,12 @@ public class ProjectConsumer {
|
|
|
Set<String> clusterRunningKeySet = stringRedisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
|
|
|
List<String> runningProjectSet;
|
|
|
if (CollectionUtil.isEmpty(clusterRunningKeySet)) {
|
|
|
- run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey(), projectJson, parallelism);
|
|
|
+ run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
return;
|
|
|
}
|
|
|
runningProjectSet = projectUtil.getRunningProjectList(clusterRunningKeySet);
|
|
|
if (CollectionUtil.isEmpty(runningProjectSet)) {
|
|
|
- run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey(), projectJson, parallelism);
|
|
|
+ run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
return;
|
|
|
}
|
|
|
// 计算正在运行的项目的并行度总和
|
|
@@ -150,72 +147,67 @@ public class ProjectConsumer {
|
|
|
}
|
|
|
// 如果执行后的并行度总和小于最大节点数则执行,否则不执行
|
|
|
if (parallelismSum + parallelism <= simulationLicenseNumber) {
|
|
|
- run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey(), projectJson, parallelism);
|
|
|
+ run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
} else {
|
|
|
- wait(clusterId, projectId, redisPrefix.getProjectWaitingKey(), projectJson);
|
|
|
+ wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @param clusterId
|
|
|
- * @param projectId
|
|
|
- * @param projectType
|
|
|
- * @param projectRunningKey
|
|
|
- * @param projectJson
|
|
|
- * @param parallelism
|
|
|
- * @return
|
|
|
+ * @param projectMessageDTO 初始接收到的项目启动信息
|
|
|
+ * @param clusterId 集群 id
|
|
|
+ * @param projectRunningKey projectRunningKey
|
|
|
+ * @param projectWaitingKey projectWaitingKey
|
|
|
*/
|
|
|
- public void run(String clusterId, String projectId, String projectType, String projectRunningKey, String projectWaitingKey, String projectJson, long parallelism) {
|
|
|
+ public void run(ProjectMessageDTO projectMessageDTO, String clusterId, String projectRunningKey, String projectWaitingKey) {
|
|
|
|
|
|
- //1 获取一个剩余可用并行度最大的节点
|
|
|
- KubernetesNodeTO maxParallelismNodeTO = projectUtil.getMaxParallelismNode();
|
|
|
- String maxRestParallelismNode = maxParallelismNodeTO.getName();
|
|
|
- long maxRestParallelism = maxParallelismNodeTO.getMaxParallelism();
|
|
|
- log.info("ProjectConsumer--run 准备在节点 " + maxParallelismNodeTO + " 执行项目 " + projectId + "。");
|
|
|
+ String projectId = projectMessageDTO.getProjectId();
|
|
|
+ int parallelism = projectMessageDTO.getParallelism(); // 期望并行度
|
|
|
+ //1 获取所有节点的剩余可用并行度
|
|
|
+ Map<String, Integer> nodeMap = projectUtil.getNodeMap(parallelism);
|
|
|
+ if (nodeMap.size() == 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //2 计算实际可用并行度
|
|
|
+ int parallelismSum = nodeMap.keySet().stream().mapToInt(nodeMap::get).sum();
|
|
|
|
|
|
//2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
|
|
|
- if (maxRestParallelism >= parallelism) {
|
|
|
- log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
|
|
|
- parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, parallelism);
|
|
|
- } else if (maxRestParallelism > 0L) {
|
|
|
- log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
|
|
|
- parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, maxRestParallelism);
|
|
|
+ if (parallelismSum > 0L) {
|
|
|
+ log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点 " + nodeMap + " 上以并行度 " + parallelismSum + " 执行!");
|
|
|
+ projectMessageDTO.setCurrentParallelism(parallelismSum); // 设置实际的并行度
|
|
|
+ parseProject(nodeMap, projectMessageDTO, "cluster:" + clusterId, projectRunningKey);
|
|
|
} else {
|
|
|
- wait(clusterId, projectId, projectWaitingKey, projectJson);
|
|
|
log.info("ProjectConsumer--cacheManualProject 服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
|
|
|
+ wait(projectWaitingKey, projectMessageDTO);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @param clusterId
|
|
|
- * @param projectId
|
|
|
- * @param projectWaitingKey
|
|
|
- * @param projectJson
|
|
|
+ * @param projectWaitingKey 项目等待 key
|
|
|
+ * @param projectMessageDTO 项目信息
|
|
|
*/
|
|
|
- public void wait(String clusterId, String projectId, String projectWaitingKey, String projectJson) {
|
|
|
- log.info("ProjectConsumer--wait 集群 " + clusterId + " 将项目 " + projectId + " 放入等待队列!");
|
|
|
- stringRedisTemplate.opsForValue().set(projectWaitingKey, projectJson);
|
|
|
+ @SneakyThrows
|
|
|
+ public void wait(String projectWaitingKey, ProjectMessageDTO projectMessageDTO) {
|
|
|
+ stringRedisTemplate.opsForValue().set(projectWaitingKey, JsonUtil.beanToJson(projectMessageDTO));
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * @param projectId
|
|
|
- * @param projectJson
|
|
|
- * @param clusterPrefix
|
|
|
- * @param projectRunningPrefix projectRunningKey
|
|
|
- * @param nodeName
|
|
|
- * @param parallelism
|
|
|
+ * @param nodeMap 节点列表以及剩余可用并行度
|
|
|
+ * @param projectMessageDTO 初始接收到的项目启动信息
|
|
|
+ * @param clusterPrefix clusterPrefix
|
|
|
+ * @param projectRunningKey projectRunningKey
|
|
|
*/
|
|
|
@SneakyThrows
|
|
|
- public void parseProject(String projectId, String projectType, String projectJson, String clusterPrefix, String projectRunningPrefix, String nodeName, long parallelism) {
|
|
|
- // -------------------------------- 0 准备 --------------------------------
|
|
|
- projectService.prepare(clusterPrefix, projectId, projectType, projectRunningPrefix, projectJson, nodeName, parallelism);
|
|
|
- log.info("ProjectConsumer--parseManualProject 接收到项目开始消息为:" + projectJson);
|
|
|
- ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
|
+ public void parseProject(Map<String, Integer> nodeMap, ProjectMessageDTO projectMessageDTO, String clusterPrefix, String projectRunningKey) {
|
|
|
+ String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
|
+ String projectType = projectMessageDTO.getType(); // 项目类型
|
|
|
String packageId = projectMessageDTO.getScenePackageId(); // 场景测试包 id
|
|
|
- Long maxSimulationTime = projectMessageDTO.getMaxSimulationTime(); // 最大仿真时间,即生成视频的时间长度
|
|
|
+ long videoTime = projectMessageDTO.getMaxSimulationTime(); // 结果视频的时长
|
|
|
String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
|
|
|
String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
|
+ // -------------------------------- 0 准备 --------------------------------
|
|
|
+ projectService.prepare(nodeMap, projectMessageDTO, clusterPrefix, projectRunningKey);
|
|
|
String userId = null;
|
|
|
if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
|
userId = manualProjectMapper.selectCreateUserById(projectId);
|
|
@@ -224,27 +216,26 @@ public class ProjectConsumer {
|
|
|
}
|
|
|
// -------------------------------- 1 查询场景 --------------------------------
|
|
|
//1-1 根据场景测试包 packageId,拿到场景集合(不包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
|
|
|
- List<ScenePO> scenePOList = projectService.handlePackage(projectRunningPrefix, projectId, packageId);
|
|
|
+ List<ScenePO> scenePOList = projectService.handlePackage(projectRunningKey, projectId, packageId);
|
|
|
+ int taskTotal = scenePOList.size();
|
|
|
+ projectMessageDTO.setTaskTotal(taskTotal);
|
|
|
+ projectMessageDTO.setTaskCompleted(0);
|
|
|
+ // 设置任务数量之后将项目运行信息放入 redis
|
|
|
+ stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
|
|
|
Set<ScenePO> scenePOSet = new HashSet<>(scenePOList); // 如果不去重的话会出现多个场景重复关联多个指标
|
|
|
+
|
|
|
// -------------------------------- 2 查询模型 --------------------------------
|
|
|
//2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
|
VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
// -------------------------------- 3 发送任务消息 --------------------------------
|
|
|
- projectService.sendTaskMessage(projectRunningPrefix, userId, projectId, projectType, maxSimulationTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
|
|
|
+ projectService.sendTaskMessage(projectRunningKey, userId, projectId, projectType, videoTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
|
|
|
// -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
|
|
|
String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
|
|
|
// -------------------------------- 5 创建 pod 开始执行 --------------------------------
|
|
|
- projectService.transferAndRunYaml(
|
|
|
- nodeName,
|
|
|
- jobTemplate + "job-template.yaml",
|
|
|
- projectId,
|
|
|
- algorithmDockerImage,
|
|
|
- scenePOList.size(),
|
|
|
- parallelism,
|
|
|
- jobYaml + "project-" + projectId + ".yaml"
|
|
|
- );
|
|
|
+ //TODO
|
|
|
+ projectService.transferAndRunYaml(projectId, nodeMap, algorithmDockerImage);
|
|
|
|
|
|
}
|
|
|
|