|
@@ -5,6 +5,8 @@ import api.common.pojo.po.scheduler.SchedulerProjectPO;
|
|
|
import api.common.util.*;
|
|
|
import com.css.simulation.resource.scheduler.api.model.ProjectMessageModel;
|
|
|
import com.css.simulation.resource.scheduler.application.entity.VehicleEntity;
|
|
|
+import com.css.simulation.resource.scheduler.domain.service.ProjectDomainService;
|
|
|
+import com.css.simulation.resource.scheduler.domain.service.TaskDomainService;
|
|
|
import com.css.simulation.resource.scheduler.infrastructure.configuration.custom.CustomConfiguration;
|
|
|
import com.css.simulation.resource.scheduler.infrastructure.configuration.docker.DockerConfiguration;
|
|
|
import com.css.simulation.resource.scheduler.infrastructure.configuration.entity.DynamicsEntity;
|
|
@@ -14,7 +16,10 @@ import com.css.simulation.resource.scheduler.infrastructure.configuration.minio.
|
|
|
import com.css.simulation.resource.scheduler.infrastructure.entity.*;
|
|
|
import com.css.simulation.resource.scheduler.infrastructure.persistence.mysql.mapper.*;
|
|
|
import com.css.simulation.resource.scheduler.infrastructure.persistence.redis.CustomRedisClient;
|
|
|
-import com.css.simulation.resource.scheduler.infrastructure.util.*;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.util.KafkaUtil;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.util.GitUtil;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.util.MinioUtil;
|
|
|
+import com.css.simulation.resource.scheduler.infrastructure.util.RedisUtil;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import io.minio.MinioClient;
|
|
|
import lombok.SneakyThrows;
|
|
@@ -27,6 +32,7 @@ import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
|
import org.springframework.kafka.support.SendResult;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
@@ -78,7 +84,7 @@ public class ProjectService {
|
|
|
@Resource
|
|
|
private TaskMapper taskMapper;
|
|
|
@Resource
|
|
|
- private TaskUtil taskUtil;
|
|
|
+ private TaskDomainService taskDomainService;
|
|
|
@Resource
|
|
|
private KafkaTemplate<String, String> kafkaTemplate;
|
|
|
@Resource(name = "myKafkaAdmin")
|
|
@@ -90,7 +96,7 @@ public class ProjectService {
|
|
|
@Resource
|
|
|
private MinioConfiguration minioConfiguration;
|
|
|
@Resource
|
|
|
- private ProjectUtil projectUtil;
|
|
|
+ private ProjectDomainService projectDomainService;
|
|
|
@Resource
|
|
|
private CloseableHttpClient closeableHttpClient;
|
|
|
@Resource
|
|
@@ -112,6 +118,11 @@ public class ProjectService {
|
|
|
|
|
|
// -------------------------------- Comment --------------------------------
|
|
|
|
|
|
+ @Async("pool1")
|
|
|
+ public void runProject(ProjectMessageModel projectMessageModel){
|
|
|
+ createTaskAndFixData(projectMessageModel);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 接收到运行信息立即复制一份数据作为据运行数
|
|
|
*
|
|
@@ -121,14 +132,13 @@ public class ProjectService {
|
|
|
//* -------------------------------- 0 读取消息,创建临时目录 --------------------------------
|
|
|
String projectId = projectMessageModel.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
String projectType = projectMessageModel.getType(); // 项目类型
|
|
|
- String isChoiceGpu = projectUtil.getIsChoiceGpuByProjectId(projectId);
|
|
|
try {
|
|
|
String modelType = projectMessageModel.getModelType(); // 模型类型,1 动力学模型 2 carsim模型
|
|
|
String packageId = projectMessageModel.getScenePackageId(); // 场景测试包 id
|
|
|
String vehicleConfigId = projectMessageModel.getVehicleConfigId(); // 模型配置 id
|
|
|
String algorithmId = projectMessageModel.getAlgorithmId(); // 模型配置 id
|
|
|
long videoTime = projectMessageModel.getMaxSimulationTime(); // 结果视频的时长
|
|
|
- String userId = projectUtil.getUserIdByProjectIdAndProjectType(projectId, projectType);
|
|
|
+ String userId = projectDomainService.getUserIdByProjectIdAndProjectType(projectId, projectType);
|
|
|
String projectPath = linuxTempPath + "project/" + projectId + "/";
|
|
|
FileUtil.mkdir(projectPath);
|
|
|
//5 将该 project 下所有旧的指标得分删除。
|
|
@@ -224,7 +234,7 @@ public class ProjectService {
|
|
|
log.debug("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
|
|
|
}
|
|
|
}
|
|
|
- taskUtil.batchInsertTask(taskList);
|
|
|
+ taskDomainService.batchInsertTask(taskList);
|
|
|
log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
|
|
|
} else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
|
|
|
|
|
@@ -294,7 +304,7 @@ public class ProjectService {
|
|
|
log.debug("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
|
|
|
}
|
|
|
}
|
|
|
- taskUtil.batchInsertTask(taskList);
|
|
|
+ taskDomainService.batchInsertTask(taskList);
|
|
|
log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
|
|
|
}
|
|
|
|
|
@@ -302,7 +312,7 @@ public class ProjectService {
|
|
|
cacheProject(projectMessageModel);
|
|
|
} catch (Exception e) {
|
|
|
log.error("项目报错。", e);
|
|
|
- stopProject(isChoiceGpu, projectId, projectType, e.getMessage());
|
|
|
+ stopProject(projectId, projectType, e.getMessage());
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
|
|
|
@@ -323,7 +333,7 @@ public class ProjectService {
|
|
|
long parallelism = projectMessageModel.getParallelism(); // 项目并行度
|
|
|
String projectType = projectMessageModel.getType(); // 项目类型
|
|
|
//2 获取用户信息(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
|
- final UserEntity userEntity = projectUtil.getUserEntityByProjectIdAndProjectType(projectId, projectType);
|
|
|
+ final UserEntity userEntity = projectDomainService.getUserEntityByProjectIdAndProjectType(projectId, projectType);
|
|
|
String projectUserId = userEntity.getId();
|
|
|
log.debug("项目 " + projectId + " 的创建人为:" + userEntity);
|
|
|
String roleCode = userEntity.getRoleCode();
|
|
@@ -333,7 +343,7 @@ public class ProjectService {
|
|
|
if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
|
clusterUserId = DictConstants.SYSTEM_CLUSTER_ID;
|
|
|
log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
|
- PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
+ PrefixEntity redisPrefix = projectDomainService.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
run(projectMessageModel, clusterUserId, modelType, DictConstants.SYSTEM_USER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
return;
|
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
@@ -356,8 +366,8 @@ public class ProjectService {
|
|
|
throw new RuntimeException("未知角色类型:" + roleCode);
|
|
|
}
|
|
|
// 获取仿真软件证书数量和动力学软件证书数量(vtd占一个仿真证书,carsim各占一个)
|
|
|
- PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterEntity.getId(), projectId);
|
|
|
- final Integer usingSimulationLicenseNumber = projectUtil.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_SIMULATION);
|
|
|
+ PrefixEntity redisPrefix = projectDomainService.getRedisPrefixByClusterIdAndProjectId(clusterEntity.getId(), projectId);
|
|
|
+ final Integer usingSimulationLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_SIMULATION);
|
|
|
final Integer usingDynamicLicenseNumber;
|
|
|
final Integer numSimulationLicense = clusterEntity.getNumSimulationLicense();
|
|
|
final Integer numDynamicLicense = clusterEntity.getNumDynamicLicense();
|
|
@@ -378,7 +388,7 @@ public class ProjectService {
|
|
|
run(projectMessageModel, clusterUserId, modelType, clusterId, projectRunningKey, projectWaitingKey);
|
|
|
}
|
|
|
} else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
|
|
|
- usingDynamicLicenseNumber = projectUtil.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_DYNAMIC);
|
|
|
+ usingDynamicLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_DYNAMIC);
|
|
|
remainderDynamicLicense = numDynamicLicense - usingDynamicLicenseNumber;
|
|
|
if (remainderSimulationLicense <= 0 || remainderDynamicLicense <= 0) {
|
|
|
wait(DictConstants.PROJECT_WAIT_TYPE_EXECUTE, projectWaitingKey, projectMessageModel);
|
|
@@ -395,8 +405,8 @@ public class ProjectService {
|
|
|
//* -------------------------------- 等待 --------------------------------
|
|
|
|
|
|
/**
|
|
|
- * @param waitType 等待类型 1等待执行 2等待扩充
|
|
|
- * @param projectWaitingKey 项目等待 key
|
|
|
+ * @param waitType 等待类型 1等待执行 2等待扩充
|
|
|
+ * @param projectWaitingKey 项目等待 key
|
|
|
* @param projectMessageModel 项目信息
|
|
|
*/
|
|
|
public void wait(String waitType, String projectWaitingKey, ProjectMessageModel projectMessageModel) {
|
|
@@ -415,21 +425,21 @@ public class ProjectService {
|
|
|
|
|
|
/**
|
|
|
* @param projectMessageModel 初始接收到的项目启动信息
|
|
|
- * @param clusterId 集群ID
|
|
|
- * @param projectRunningKey projectRunningKey
|
|
|
- * @param projectWaitingKey projectWaitingKey
|
|
|
+ * @param clusterId 集群ID
|
|
|
+ * @param projectRunningKey projectRunningKey
|
|
|
+ * @param projectWaitingKey projectWaitingKey
|
|
|
*/
|
|
|
public void run(ProjectMessageModel projectMessageModel, String clusterUserId, String modelType, String clusterId, String projectRunningKey, String projectWaitingKey) {
|
|
|
String projectId = projectMessageModel.getProjectId(); // 项目 id
|
|
|
- String isChoiceGpu = projectUtil.getIsChoiceGpuByProjectId(projectId);
|
|
|
+ String isChoiceGpu = projectDomainService.getIsChoiceGpuByProjectId(projectId);
|
|
|
int parallelism = projectMessageModel.getParallelism(); // 期望并行度
|
|
|
//1 获取集群剩余可用并行度
|
|
|
- int restParallelism = projectUtil.getRestParallelism(isChoiceGpu);
|
|
|
+ int restParallelism = projectDomainService.getRestParallelism(isChoiceGpu);
|
|
|
//2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
|
|
|
if (restParallelism > 0L) {
|
|
|
log.info("集群 " + clusterId + " 执行项目 " + projectId);
|
|
|
if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)) {
|
|
|
- projectUtil.useLicense(clusterUserId, modelType, parallelism);
|
|
|
+ projectDomainService.useLicense(clusterUserId, modelType, parallelism);
|
|
|
}
|
|
|
// 设置实际的并行度
|
|
|
projectMessageModel.setCurrentParallelism(Math.min(restParallelism, parallelism)); // 设置实际的并行度
|
|
@@ -444,7 +454,7 @@ public class ProjectService {
|
|
|
* 运行项目
|
|
|
*
|
|
|
* @param projectMessageModel 初始接收到的项目启动信息
|
|
|
- * @param projectRunningKey projectRunningKey
|
|
|
+ * @param projectRunningKey projectRunningKey
|
|
|
*/
|
|
|
@SneakyThrows
|
|
|
public void parseProject(ProjectMessageModel projectMessageModel, String projectRunningKey, String isChoiceGpu) {
|
|
@@ -461,10 +471,10 @@ public class ProjectService {
|
|
|
projectMessageModel.setTaskCompleted(0);
|
|
|
// 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
|
|
|
//1 获取剩余并行度和即将使用的各node的并行度
|
|
|
- Map<String, Integer> nodeMap0 = projectUtil.getNodeMap(isChoiceGpu);
|
|
|
- Map<String, Integer> nodeMap = projectUtil.getNodeMapToUse(isChoiceGpu, Math.min(currentParallelism, taskTotal));
|
|
|
+ Map<String, Integer> nodeMap0 = projectDomainService.getNodeMap(isChoiceGpu);
|
|
|
+ Map<String, Integer> nodeMap = projectDomainService.getNodeMapToUse(isChoiceGpu, Math.min(currentParallelism, taskTotal));
|
|
|
//2 将指定 node 的并行度减少
|
|
|
- nodeMap.keySet().forEach(nodeName -> projectUtil.decrementParallelism(isChoiceGpu, nodeName, nodeMap.get(nodeName)));
|
|
|
+ nodeMap.keySet().forEach(nodeName -> projectDomainService.decrementParallelism(isChoiceGpu, nodeName, nodeMap.get(nodeName)));
|
|
|
// 重新设置实际使用的并行度并保存到 redis
|
|
|
int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
|
|
|
projectMessageModel.setCurrentParallelism(realCurrentParallelism);
|
|
@@ -473,9 +483,9 @@ public class ProjectService {
|
|
|
//* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
|
|
|
String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
|
|
|
// -------------------------------- 4 发送任务消息 --------------------------------
|
|
|
- List<NodeEntity> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
|
|
|
+ List<NodeEntity> nodeListToCount = projectDomainService.getNodeListToCount(nodeMap);
|
|
|
int messageNumber = 0;
|
|
|
- ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1); // 创建主题
|
|
|
+ KafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1); // 创建主题
|
|
|
TimeUnit.SECONDS.sleep(7);
|
|
|
// 需要即时启动的任务(并行度的大小)
|
|
|
CopyOnWriteArrayList<String> yamlToRunRedisKeyList = new CopyOnWriteArrayList<>();
|
|
@@ -534,7 +544,7 @@ public class ProjectService {
|
|
|
TimeUnit.SECONDS.sleep(6);
|
|
|
log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
|
|
|
for (String redisKey : yamlToRunRedisKeyList) {
|
|
|
- projectUtil.createPodBegin(projectId, redisKey);
|
|
|
+ projectDomainService.createPodBegin(projectId, redisKey);
|
|
|
}
|
|
|
log.info("项目 " + projectId + " 已经启动 " + yamlToRunRedisKeyList);
|
|
|
}
|
|
@@ -547,8 +557,8 @@ public class ProjectService {
|
|
|
*/
|
|
|
@SneakyThrows
|
|
|
public String createTempYaml(String projectId, String vehicleConfigId, String modelType, String algorithmDockerImage, String nodeName, int kafkaPartition, long kafkaOffset, String isChoiceGpu, Integer cpuOrder) {
|
|
|
- String podName = projectUtil.getRandomPodName(projectId); // 生成 podName
|
|
|
- String podYaml = projectUtil.getPodYamlName(nodeName, podName); // 模板文件名称
|
|
|
+ String podName = projectDomainService.getRandomPodName(projectId); // 生成 podName
|
|
|
+ String podYaml = projectDomainService.getPodYamlName(nodeName, podName); // 模板文件名称
|
|
|
String yamlPath = podYamlDirectory + podYaml;
|
|
|
String finalYaml;
|
|
|
|
|
@@ -745,7 +755,7 @@ public class ProjectService {
|
|
|
String dockerImageWithoutVersion = dockerConfiguration.getRegistry() + "/algorithm_" + algorithmId;
|
|
|
dockerImage = dockerImageWithoutVersion + ":latest";
|
|
|
//0 查看算法是否已经导入
|
|
|
- if (!projectUtil.isImported(dockerImageWithoutVersion)) {
|
|
|
+ if (!projectDomainService.isImported(dockerImageWithoutVersion)) {
|
|
|
//1 获取 token
|
|
|
String tokenUrl = customConfiguration.getAlgorithmPlatformTokenUri() + "?grant_type=client_credential&appid=" + customConfiguration.getAlgorithmPlatformAppid() + "&secret=" + customConfiguration.getAlgorithmPlatformSecret();
|
|
|
String tokenJson = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
|
|
@@ -770,7 +780,7 @@ public class ProjectService {
|
|
|
return dockerImage;
|
|
|
}
|
|
|
|
|
|
- public void stopProject(String isChoiceGpu, String projectType, String projectId, String errorMessage) {
|
|
|
+ public void stopProject(String projectType, String projectId, String errorMessage) {
|
|
|
Optional.ofNullable(errorMessage).ifPresent(em -> {
|
|
|
if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
|
manualProjectMapper.saveErrorMessage(SchedulerProjectPO.builder().id(projectId).errorMessage(em).modifyUserId(DictConstants.SCHEDULER_USER_ID).modifyTime(TimeUtil.getNowForMysql()).build());
|
|
@@ -778,7 +788,7 @@ public class ProjectService {
|
|
|
autoSubProjectMapper.saveErrorMessage(SchedulerProjectPO.builder().id(projectId).errorMessage(em).modifyUserId(DictConstants.SCHEDULER_USER_ID).modifyTime(TimeUtil.getNowForMysql()).build());
|
|
|
}
|
|
|
});
|
|
|
- stopProject( projectType, projectId);
|
|
|
+ stopProject(projectType, projectId);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -786,8 +796,8 @@ public class ProjectService {
|
|
|
* @param projectType 项目类型
|
|
|
*/
|
|
|
@SneakyThrows
|
|
|
- public void stopProject( String projectType, String projectId) {
|
|
|
- String isChoiceGpu = projectUtil.getIsChoiceGpuByProjectId(projectId);
|
|
|
+ public void stopProject(String projectType, String projectId) {
|
|
|
+ String isChoiceGpu = projectDomainService.getIsChoiceGpuByProjectId(projectId);
|
|
|
//* -------------------------------- Comment --------------------------------
|
|
|
ProjectEntity projectEntity;
|
|
|
String projectUserId, clusterUserId, modelType, projectRunningKeyPrefix;
|
|
@@ -796,9 +806,9 @@ public class ProjectService {
|
|
|
Set<String> projectRunningKeySet;
|
|
|
boolean isRunning;
|
|
|
//* -------------------------------- Comment --------------------------------
|
|
|
- projectEntity = projectUtil.getProjectByProjectId(projectId);
|
|
|
+ projectEntity = projectDomainService.getProjectByProjectId(projectId);
|
|
|
parallelism = Integer.parseInt(projectEntity.getParallelism());
|
|
|
- redisPrefix = projectUtil.getRedisPrefixByProjectIdAndProjectType(projectId, projectType);
|
|
|
+ redisPrefix = projectDomainService.getRedisPrefixByProjectIdAndProjectType(projectId, projectType);
|
|
|
projectRunningKeyPrefix = redisPrefix.getProjectRunningKey();
|
|
|
//* -------------------------------- Comment --------------------------------
|
|
|
//1 判断项目是否已经运行
|
|
@@ -816,26 +826,26 @@ public class ProjectService {
|
|
|
}
|
|
|
|
|
|
projectUserId = projectEntity.getCreateUserId();
|
|
|
- clusterUserId = projectUtil.getClusterUserIdByProjectUserId(projectUserId);
|
|
|
+ clusterUserId = projectDomainService.getClusterUserIdByProjectUserId(projectUserId);
|
|
|
|
|
|
//3 删除 kafka 消息
|
|
|
- ApacheKafkaUtil.deleteTopic(kafkaAdminClient, projectId);
|
|
|
+ KafkaUtil.deleteTopic(kafkaAdminClient, projectId);
|
|
|
//4 删除项目所有任务
|
|
|
taskMapper.deleteByProject(projectId);
|
|
|
|
|
|
//5 根据 pod 前缀删除所有 pod
|
|
|
- modelType = projectUtil.getModelTypeByProjectIdAndProjectType(projectId, projectType);
|
|
|
+ modelType = projectDomainService.getModelTypeByProjectIdAndProjectType(projectId, projectType);
|
|
|
if (isRunning) {
|
|
|
Set<String> nodeOfPodKeySet = RedisUtil.getKeySetByPrefix(stringRedisTemplate, "project:" + projectId + ":pod:" + "project-" + projectId);
|
|
|
Set<String> podNameSet = nodeOfPodKeySet.stream().map(key -> key.split(":")[3]).collect(Collectors.toSet());
|
|
|
for (String podName : podNameSet) {
|
|
|
- String nodeName = projectUtil.getNodeNameOfPod(projectId, podName);
|
|
|
+ String nodeName = projectDomainService.getNodeNameOfPod(projectId, podName);
|
|
|
// 删除 pod
|
|
|
- projectUtil.deletePod(podName);
|
|
|
+ projectDomainService.deletePod(podName);
|
|
|
// 节点并行度加一
|
|
|
- projectUtil.incrementOneParallelism(isChoiceGpu, nodeName);
|
|
|
+ projectDomainService.incrementOneParallelism(isChoiceGpu, nodeName);
|
|
|
// 释放证书
|
|
|
- projectUtil.releaseLicense(clusterUserId, modelType, parallelism);
|
|
|
+ projectDomainService.releaseLicense(clusterUserId, modelType, parallelism);
|
|
|
}
|
|
|
}
|
|
|
|