|
@@ -12,8 +12,6 @@ import com.css.simulation.resource.scheduler.infrastructure.configuration.custom
|
|
import com.css.simulation.resource.scheduler.infrastructure.configuration.docker.DockerConfiguration;
|
|
import com.css.simulation.resource.scheduler.infrastructure.configuration.docker.DockerConfiguration;
|
|
import com.css.simulation.resource.scheduler.infrastructure.configuration.entity.DynamicsEntity;
|
|
import com.css.simulation.resource.scheduler.infrastructure.configuration.entity.DynamicsEntity;
|
|
import com.css.simulation.resource.scheduler.infrastructure.configuration.git.GitConfiguration;
|
|
import com.css.simulation.resource.scheduler.infrastructure.configuration.git.GitConfiguration;
|
|
-import com.css.simulation.resource.scheduler.infrastructure.configuration.kubernetes.KubernetesConfiguration;
|
|
|
|
-import com.css.simulation.resource.scheduler.infrastructure.configuration.minio.MinioConfiguration;
|
|
|
|
import com.css.simulation.resource.scheduler.infrastructure.entity.*;
|
|
import com.css.simulation.resource.scheduler.infrastructure.entity.*;
|
|
import com.css.simulation.resource.scheduler.infrastructure.persistence.git.GitUtil;
|
|
import com.css.simulation.resource.scheduler.infrastructure.persistence.git.GitUtil;
|
|
import com.css.simulation.resource.scheduler.infrastructure.persistence.kafka.KafkaUtil;
|
|
import com.css.simulation.resource.scheduler.infrastructure.persistence.kafka.KafkaUtil;
|
|
@@ -54,14 +52,6 @@ public class ProjectService {
|
|
private String podYamlDirectory;
|
|
private String podYamlDirectory;
|
|
@Value("${minio.bucket-name}")
|
|
@Value("${minio.bucket-name}")
|
|
private String bucketName;
|
|
private String bucketName;
|
|
- @Value("${scheduler.linux-path.vtd-pod-template-yaml}")
|
|
|
|
- private String vtdPodTemplateYaml;
|
|
|
|
- @Value("${scheduler.linux-path.carsim-pod-template-yaml}")
|
|
|
|
- private String carsimPodTemplateYaml;
|
|
|
|
- @Value("${scheduler.simulation-cloud-ip}")
|
|
|
|
- private String simulationCloudIp;
|
|
|
|
- @Value("${spring.kafka.bootstrap-servers}")
|
|
|
|
- private String kafkaIp;
|
|
|
|
|
|
|
|
// -------------------------------- Comment --------------------------------
|
|
// -------------------------------- Comment --------------------------------
|
|
@Resource
|
|
@Resource
|
|
@@ -91,10 +81,6 @@ public class ProjectService {
|
|
@Resource
|
|
@Resource
|
|
private CustomConfiguration customConfiguration;
|
|
private CustomConfiguration customConfiguration;
|
|
@Resource
|
|
@Resource
|
|
- private KubernetesConfiguration kubernetesConfiguration;
|
|
|
|
- @Resource
|
|
|
|
- private MinioConfiguration minioConfiguration;
|
|
|
|
- @Resource
|
|
|
|
private ProjectDomainService projectDomainService;
|
|
private ProjectDomainService projectDomainService;
|
|
@Resource
|
|
@Resource
|
|
private IndexTemplateMapper indexTemplateMapper;
|
|
private IndexTemplateMapper indexTemplateMapper;
|
|
@@ -123,7 +109,7 @@ public class ProjectService {
|
|
//1 创建任务文件并固定场景数据
|
|
//1 创建任务文件并固定场景数据
|
|
createTaskAndFixData(projectMessageModel);
|
|
createTaskAndFixData(projectMessageModel);
|
|
//2 校验证书和并行度
|
|
//2 校验证书和并行度
|
|
- checkIfCanRun(projectMessageModel, DictConstants.PROJECT_RUN_TYPE_EXECUTE);
|
|
|
|
|
|
+ checkIfCanRun(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_RUN_TYPE_EXECUTE).waitingParallelism(projectMessageModel.getParallelism()).projectMessageModel(projectMessageModel).build());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -322,73 +308,99 @@ public class ProjectService {
|
|
/**
|
|
/**
|
|
* 任务运行前首先判断用户是否拥有可分配资源
|
|
* 任务运行前首先判断用户是否拥有可分配资源
|
|
*
|
|
*
|
|
- * @param projectMessageModel 项目启动消息
|
|
|
|
- * @param runType 运行类型 1执行 2扩充
|
|
|
|
|
|
+ * @param projectWaitQueueEntity 项目启动消息
|
|
*/
|
|
*/
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
- public void checkIfCanRun(ProjectMessageModel projectMessageModel, String runType) {
|
|
|
|
- if (DictConstants.PROJECT_RUN_TYPE_EXECUTE.equals(runType)) {
|
|
|
|
- log.debug("判断用户是否拥有可分配资源:" + projectMessageModel);
|
|
|
|
- //1 读取 kafka 的 project 信息
|
|
|
|
- String modelType = projectMessageModel.getModelType();
|
|
|
|
- String projectId = projectMessageModel.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
|
- int parallelism = projectMessageModel.getParallelism(); // 项目并行度
|
|
|
|
- String projectType = projectMessageModel.getType(); // 项目类型
|
|
|
|
- String isChoiceGpu = projectDomainService.getIsChoiceGpuByProjectId(projectId);
|
|
|
|
- int remainderParallelism = projectDomainService.getRemainderParallelism(isChoiceGpu);
|
|
|
|
- //2 获取用户信息(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
|
|
- final UserEntity userEntity = projectDomainService.getUserEntityByProjectIdAndProjectType(projectId, projectType);
|
|
|
|
- String projectUserId = userEntity.getId();
|
|
|
|
- log.debug("项目 " + projectId + " 的创建人为:" + userEntity);
|
|
|
|
- String roleCode = userEntity.getRoleCode();
|
|
|
|
- String useType = userEntity.getUseType();
|
|
|
|
- ClusterEntity clusterEntity;
|
|
|
|
- String clusterUserId; // 项目实际运行使用的用户集群
|
|
|
|
- if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
|
|
- clusterUserId = DictConstants.SYSTEM_USER_ID;
|
|
|
|
- log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
|
|
- PrefixEntity redisPrefix = projectDomainService.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
|
- final String projectRunningKey = redisPrefix.getProjectRunningKey();
|
|
|
|
- if (remainderParallelism <= 0) {
|
|
|
|
|
|
+ public void checkIfCanRun(ProjectWaitQueueEntity projectWaitQueueEntity) {
|
|
|
|
+ final String waitingType = projectWaitQueueEntity.getWaitingType();
|
|
|
|
+ final Integer waitingParallelism = projectWaitQueueEntity.getWaitingParallelism();
|
|
|
|
+ final ProjectMessageModel projectMessageModel = projectWaitQueueEntity.getProjectMessageModel();
|
|
|
|
+ log.debug("判断用户是否拥有可分配资源:" + projectMessageModel);
|
|
|
|
+ //1 项目信息
|
|
|
|
+ String modelType = projectMessageModel.getModelType();
|
|
|
|
+ String projectId = projectMessageModel.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
|
+ int parallelism = projectMessageModel.getParallelism(); // 项目并行度
|
|
|
|
+ String projectType = projectMessageModel.getType(); // 项目类型
|
|
|
|
+ String isChoiceGpu = projectDomainService.getIsChoiceGpuByProjectId(projectId);
|
|
|
|
+ //2 剩余并行度
|
|
|
|
+ int remainderParallelism = projectDomainService.getRemainderParallelism(isChoiceGpu);
|
|
|
|
+ //3 用户信息(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
|
|
+ final UserEntity userEntity = projectDomainService.getUserEntityByProjectIdAndProjectType(projectId, projectType);
|
|
|
|
+ String projectUserId = userEntity.getId();
|
|
|
|
+ String roleCode = userEntity.getRoleCode();
|
|
|
|
+ String useType = userEntity.getUseType();
|
|
|
|
+ ClusterEntity clusterEntity;
|
|
|
|
+ String clusterUserId; // 项目实际运行使用的用户集群
|
|
|
|
+ if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
|
|
+ clusterUserId = DictConstants.SYSTEM_USER_ID;
|
|
|
|
+ log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
|
|
+ PrefixEntity redisPrefix = projectDomainService.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
|
+ final String projectRunningKey = redisPrefix.getProjectRunningKey();
|
|
|
|
+ if (remainderParallelism <= 0) {
|
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectMessageModel(projectMessageModel).build());
|
|
|
|
+ } else if (remainderParallelism < parallelism) {
|
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(parallelism - remainderParallelism).projectMessageModel(projectMessageModel).build());
|
|
|
|
+ run(clusterUserId, remainderParallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
|
|
+ } else {
|
|
|
|
+ run(clusterUserId, parallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
|
|
+ }
|
|
|
|
+ return;
|
|
|
|
+ } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
|
|
+ clusterUserId = projectUserId;
|
|
|
|
+ clusterEntity = clusterMapper.selectByUserId(clusterUserId);
|
|
|
|
+ log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterEntity);
|
|
|
|
+ } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
|
|
|
|
+ if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) { //3-3 普通子账户,根据自己的独占节点排队
|
|
|
|
+ clusterUserId = projectUserId;
|
|
|
|
+ clusterEntity = clusterMapper.selectByUserId(clusterUserId);
|
|
|
|
+ log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通独占子账户(自己的集群),集群为:" + clusterEntity);
|
|
|
|
+ } else if (DictConstants.USER_TYPE_PUBLIC.equals(useType)) { //3-4 共享子账户,根据父账户的共享节点排队
|
|
|
|
+ clusterUserId = userEntity.getCreateUserId();
|
|
|
|
+ clusterEntity = clusterMapper.selectByUserId(clusterUserId);
|
|
|
|
+ log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通共享子账户(父账户的集群),集群为:" + clusterEntity);
|
|
|
|
+ } else {
|
|
|
|
+ throw new RuntimeException("用户" + projectUserId + "未知占用类型:" + useType);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ throw new RuntimeException("未知角色类型:" + roleCode);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 获取仿真软件证书数量和动力学软件证书数量(vtd占一个仿真证书,carsim各占一个)
|
|
|
|
+ PrefixEntity redisPrefix = projectDomainService.getRedisPrefixByClusterIdAndProjectId(clusterEntity.getId(), projectId);
|
|
|
|
+ final Integer usingSimulationLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_SIMULATION);
|
|
|
|
+ final Integer usingDynamicLicenseNumber;
|
|
|
|
+ final Integer numSimulationLicense = clusterEntity.getNumSimulationLicense();
|
|
|
|
+ final Integer numDynamicLicense = clusterEntity.getNumDynamicLicense();
|
|
|
|
+ final String projectRunningKey = redisPrefix.getProjectRunningKey();
|
|
|
|
+ // 判断仿真证书是否够用,如果证书为0则将项目加入等待队列;如果证书小于并行度则加入扩充队列,并用现有证书执行;如果证书够用,直接执行。
|
|
|
|
+ final int remainderSimulationLicense = numSimulationLicense - usingSimulationLicenseNumber;
|
|
|
|
+ final int remainderDynamicLicense;
|
|
|
|
+ if (DictConstants.PROJECT_WAIT_TYPE_EXECUTE.equals(waitingType)) {
|
|
|
|
+ if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
|
|
|
|
+ if (remainderSimulationLicense <= 0 || remainderParallelism <= 0) {
|
|
wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectMessageModel(projectMessageModel).build());
|
|
wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectMessageModel(projectMessageModel).build());
|
|
- } else if (remainderParallelism < parallelism) {
|
|
|
|
- wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(parallelism - remainderParallelism).projectMessageModel(projectMessageModel).build());
|
|
|
|
- run(clusterUserId, remainderParallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
|
|
|
|
+ } else if (remainderSimulationLicense < parallelism || remainderParallelism <= parallelism) {
|
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(parallelism - Math.min(remainderSimulationLicense, remainderParallelism)).projectMessageModel(projectMessageModel).build());
|
|
|
|
+ run(clusterUserId, Math.min(remainderSimulationLicense, remainderParallelism), projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
} else {
|
|
} else {
|
|
run(clusterUserId, parallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
run(clusterUserId, parallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
}
|
|
}
|
|
- return;
|
|
|
|
- } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
|
|
- clusterUserId = projectUserId;
|
|
|
|
- clusterEntity = clusterMapper.selectByUserId(clusterUserId);
|
|
|
|
- log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterEntity);
|
|
|
|
- } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
|
|
|
|
- if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) { //3-3 普通子账户,根据自己的独占节点排队
|
|
|
|
- clusterUserId = projectUserId;
|
|
|
|
- clusterEntity = clusterMapper.selectByUserId(clusterUserId);
|
|
|
|
- log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通独占子账户(自己的集群),集群为:" + clusterEntity);
|
|
|
|
- } else if (DictConstants.USER_TYPE_PUBLIC.equals(useType)) { //3-4 共享子账户,根据父账户的共享节点排队
|
|
|
|
- clusterUserId = userEntity.getCreateUserId();
|
|
|
|
- clusterEntity = clusterMapper.selectByUserId(clusterUserId);
|
|
|
|
- log.debug("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通共享子账户(父账户的集群),集群为:" + clusterEntity);
|
|
|
|
|
|
+ } else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
|
|
|
|
+ usingDynamicLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_DYNAMIC);
|
|
|
|
+ remainderDynamicLicense = numDynamicLicense - usingDynamicLicenseNumber;
|
|
|
|
+ if (remainderSimulationLicense <= 0 || remainderDynamicLicense <= 0 || remainderParallelism <= 0) {
|
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectMessageModel(projectMessageModel).build());
|
|
|
|
+ } else if (remainderSimulationLicense < parallelism || remainderDynamicLicense < parallelism || remainderParallelism < parallelism) {
|
|
|
|
+ wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXPAND).waitingParallelism(parallelism - Math.min(Math.min(remainderSimulationLicense, remainderDynamicLicense), remainderParallelism)).projectMessageModel(projectMessageModel).build());
|
|
|
|
+ run(clusterUserId, Math.min(remainderSimulationLicense, Math.min(remainderDynamicLicense, remainderParallelism)), projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
} else {
|
|
} else {
|
|
- throw new RuntimeException("用户" + projectUserId + "未知占用类型:" + useType);
|
|
|
|
|
|
+ run(clusterUserId, parallelism, projectMessageModel, projectRunningKey, isChoiceGpu);
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- throw new RuntimeException("未知角色类型:" + roleCode);
|
|
|
|
|
|
+ throw new RuntimeException("未知模型类型:" + modelType);
|
|
}
|
|
}
|
|
- // 获取仿真软件证书数量和动力学软件证书数量(vtd占一个仿真证书,carsim各占一个)
|
|
|
|
- PrefixEntity redisPrefix = projectDomainService.getRedisPrefixByClusterIdAndProjectId(clusterEntity.getId(), projectId);
|
|
|
|
- final Integer usingSimulationLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_SIMULATION);
|
|
|
|
- final Integer usingDynamicLicenseNumber;
|
|
|
|
- final Integer numSimulationLicense = clusterEntity.getNumSimulationLicense();
|
|
|
|
- final Integer numDynamicLicense = clusterEntity.getNumDynamicLicense();
|
|
|
|
- final String projectRunningKey = redisPrefix.getProjectRunningKey();
|
|
|
|
- //1 判断仿真证书是否够用,如果证书为0则将项目加入等待队列;如果证书小于并行度则加入扩充队列,并用现有证书执行;如果证书够用,直接执行。
|
|
|
|
- final int remainderSimulationLicense = numSimulationLicense - usingSimulationLicenseNumber;
|
|
|
|
- final int remainderDynamicLicense;
|
|
|
|
-
|
|
|
|
- // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
|
|
|
|
|
|
+ } else if (DictConstants.PROJECT_WAIT_TYPE_EXPAND.equals(waitingType)) {
|
|
|
|
+ //TODO
|
|
if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
|
|
if (DictConstants.MODEL_TYPE_VTD.equals(modelType)) {
|
|
if (remainderSimulationLicense <= 0 || remainderParallelism <= 0) {
|
|
if (remainderSimulationLicense <= 0 || remainderParallelism <= 0) {
|
|
wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectMessageModel(projectMessageModel).build());
|
|
wait(ProjectWaitQueueEntity.builder().waitingType(DictConstants.PROJECT_WAIT_TYPE_EXECUTE).waitingParallelism(parallelism).projectMessageModel(projectMessageModel).build());
|
|
@@ -412,10 +424,8 @@ public class ProjectService {
|
|
} else {
|
|
} else {
|
|
throw new RuntimeException("未知模型类型:" + modelType);
|
|
throw new RuntimeException("未知模型类型:" + modelType);
|
|
}
|
|
}
|
|
- } else if (DictConstants.PROJECT_RUN_TYPE_EXPAND.equals(runType)) {
|
|
|
|
-
|
|
|
|
} else {
|
|
} else {
|
|
- throw new RuntimeException("未知运行类型:" + runType);
|
|
|
|
|
|
+ throw new RuntimeException("未知运行类型:" + waitingType);
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
@@ -528,15 +538,15 @@ public class ProjectService {
|
|
}
|
|
}
|
|
// 只有准备启动(即 currentCount == 0)的时候才指定 cpu 编号
|
|
// 只有准备启动(即 currentCount == 0)的时候才指定 cpu 编号
|
|
log.info("创建任务 " + taskId + " 的 yaml:是否使用 gpu (0是1否)" + isChoiceGpu + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前节点名称为:" + currentNodeName + ",当前 cpu 编号为:" + cpuOrder);
|
|
log.info("创建任务 " + taskId + " 的 yaml:是否使用 gpu (0是1否)" + isChoiceGpu + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前节点名称为:" + currentNodeName + ",当前 cpu 编号为:" + cpuOrder);
|
|
- String yamlRedisKey = createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder);
|
|
|
|
-
|
|
|
|
|
|
+ String yamlRedisKey = projectDomainService.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder);
|
|
if (currentCount == 0) {
|
|
if (currentCount == 0) {
|
|
- String podName = yamlRedisKey.split(":")[yamlRedisKey.split(":").length - 1];
|
|
|
|
- log.info("将 POD :{} 加入到启动列表 ", podName);
|
|
|
|
yamlToRunRedisKeyList.add(yamlRedisKey);
|
|
yamlToRunRedisKeyList.add(yamlRedisKey);
|
|
|
|
+ } else {
|
|
|
|
+ yamlToWaitRedisKeyList.add(yamlRedisKey);
|
|
}
|
|
}
|
|
messageNumber++;
|
|
messageNumber++;
|
|
}
|
|
}
|
|
|
|
+ customRedisClient.set("project:" + projectId + ":waiting-queue", JsonUtil.listToJson(yamlToWaitRedisKeyList));
|
|
TimeUnit.SECONDS.sleep(6);
|
|
TimeUnit.SECONDS.sleep(6);
|
|
log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
|
|
log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
|
|
for (String redisKey : yamlToRunRedisKeyList) {
|
|
for (String redisKey : yamlToRunRedisKeyList) {
|
|
@@ -548,109 +558,6 @@ public class ProjectService {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
- //* -------------------------------- Comment --------------------------------
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 创建一个临时 yaml,node 在最后用 # 号隔开
|
|
|
|
- */
|
|
|
|
- @SneakyThrows
|
|
|
|
- public String createTempYaml(String projectId, String vehicleConfigId, String modelType, String algorithmDockerImage, String nodeName, int kafkaPartition, long kafkaOffset, String isChoiceGpu, Integer cpuOrder) {
|
|
|
|
- String podName = projectDomainService.getRandomPodName(projectId); // 生成 podName
|
|
|
|
- String podYaml = projectDomainService.getPodYamlName(nodeName, podName); // 模板文件名称
|
|
|
|
- String yamlPath = podYamlDirectory + podYaml;
|
|
|
|
- String finalYaml;
|
|
|
|
-
|
|
|
|
- if ("1".equals(modelType)) {
|
|
|
|
- String podString = FileUtil.read(new File(vtdPodTemplateYaml));
|
|
|
|
- String replace0 = podString.replace("vtd-container", "vtd-" + projectId);
|
|
|
|
- String replace1 = replace0.replace("simulation-cloud-ip", simulationCloudIp);
|
|
|
|
- String replace2 = replace1.replace("kafka-ip", kafkaIp);
|
|
|
|
- String replace3 = replace2.replace("kafka-topic", projectId); // 消息主题名称为 projectId
|
|
|
|
- String replace4 = replace3.replace("kafka-partition", "\"" + kafkaPartition + "\"");
|
|
|
|
- String replace5 = replace4.replace("kafka-offset", "\"" + kafkaOffset + "\"");
|
|
|
|
- String replace6 = replace5.replace("minio-ip", minioConfiguration.getEndpointWithoutHttp());
|
|
|
|
- String replace7 = replace6.replace("minio-access-key", minioConfiguration.getAccessKey());
|
|
|
|
- String replace8 = replace7.replace("minio-secret-key", minioConfiguration.getSecretKey());
|
|
|
|
-
|
|
|
|
- String replace9 = replace8.replace("algorithm-container", "algorithm-" + projectId);
|
|
|
|
- String replace10 = replace9.replace("algorithm-image", algorithmDockerImage);
|
|
|
|
-
|
|
|
|
- String replace11 = replace10.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
|
|
|
|
- String replace12 = replace11.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
|
|
|
|
- String replace13 = replace12.replace("node-name", nodeName); // 指定 pod 运行节点
|
|
|
|
-
|
|
|
|
- String replace14;
|
|
|
|
- if (cpuOrder != null) {
|
|
|
|
- replace14 = replace13.replace("cpu-order", "\"" + cpuOrder + "\""); // 指定 cpu 编号
|
|
|
|
- stringRedisTemplate.opsForValue().set("project:" + projectId + ":pod:" + podName + ":cpu", String.valueOf(cpuOrder)); // pod 运行使用的 cpu编号
|
|
|
|
- } else {
|
|
|
|
- replace14 = replace13;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
|
|
|
|
- String replace15 = replace14.replace("vtd-image", kubernetesConfiguration.getImageVtdGpu());
|
|
|
|
- finalYaml = replace15.replace("vtd-command", kubernetesConfiguration.getCommandVtdGpu());
|
|
|
|
- } else if (DictConstants.NOT_USE_GPU.equals(isChoiceGpu)) {
|
|
|
|
- String replace15 = replace14.replace("vtd-image", kubernetesConfiguration.getImageVtdNogpu());
|
|
|
|
- finalYaml = replace15.replace("vtd-command", kubernetesConfiguration.getCommandVtdNogpu());
|
|
|
|
- } else {
|
|
|
|
- throw new RuntimeException("createTempYaml() 是否使用 gpu:" + isChoiceGpu);
|
|
|
|
- }
|
|
|
|
- } else if ("2".equals(modelType)) {
|
|
|
|
- String podString = FileUtil.read(new File(carsimPodTemplateYaml));
|
|
|
|
- String replace0 = podString.replace("vtd-container", "vtd-" + projectId);
|
|
|
|
- String replace1 = replace0.replace("simulation-cloud-ip", simulationCloudIp);
|
|
|
|
- String replace2 = replace1.replace("kafka-ip", kafkaIp);
|
|
|
|
- String replace3 = replace2.replace("kafka-topic", projectId); // 消息主题名称为 projectId
|
|
|
|
- String replace4 = replace3.replace("kafka-partition", "\"" + kafkaPartition + "\"");
|
|
|
|
- String replace5 = replace4.replace("kafka-offset", "\"" + kafkaOffset + "\"");
|
|
|
|
- String replace6 = replace5.replaceAll("minio-ip", minioConfiguration.getEndpointWithoutHttp());
|
|
|
|
- String replace7 = replace6.replaceAll("minio-access-key", minioConfiguration.getAccessKey());
|
|
|
|
- String replace8 = replace7.replaceAll("minio-secret-key", minioConfiguration.getSecretKey());
|
|
|
|
-
|
|
|
|
- String replace9 = replace8.replace("algorithm-container", "algorithm-" + projectId);
|
|
|
|
- String replace10 = replace9.replace("algorithm-image", algorithmDockerImage);
|
|
|
|
-
|
|
|
|
- String replace11 = replace10.replace("carsim-container", "carsim-" + projectId);
|
|
|
|
- String replace12 = replace11.replace("carsim-image", kubernetesConfiguration.getCarsimImage());
|
|
|
|
- String replace13 = replace12.replace("carsim-command", kubernetesConfiguration.getCarsimCommand());
|
|
|
|
- String replace14 = replace13.replace("minio-bucket", minioConfiguration.getBucketName());
|
|
|
|
- String replace15 = replace14.replace("par-path", vehicleMapper.selectParPathByVehicleConfigId(vehicleConfigId));
|
|
|
|
-
|
|
|
|
- String replace16 = replace15.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
|
|
|
|
- String replace17 = replace16.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
|
|
|
|
- String replace18 = replace17.replace("node-name", nodeName); // 指定 pod 运行节点
|
|
|
|
-
|
|
|
|
- String replace19;
|
|
|
|
- if (cpuOrder != null) {
|
|
|
|
- replace19 = replace18.replace("cpu-order", "\"" + cpuOrder + "\""); // 指定 cpu 编号
|
|
|
|
- stringRedisTemplate.opsForValue().set("project:" + projectId + ":pod:" + podName + ":cpu", String.valueOf(cpuOrder)); // pod 运行使用的 cpu编号
|
|
|
|
- } else {
|
|
|
|
- replace19 = replace18;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
|
|
|
|
- log.info("k8s参数为:" + kubernetesConfiguration);
|
|
|
|
- log.info("yaml模板为:" + replace12);
|
|
|
|
- String replace20 = replace19.replace("vtd-image", kubernetesConfiguration.getImageVtdGpu());
|
|
|
|
- finalYaml = replace20.replace("vtd-command", kubernetesConfiguration.getCommandVtdCarsimGpu());
|
|
|
|
- } else if (DictConstants.NOT_USE_GPU.equals(isChoiceGpu)) {
|
|
|
|
- String replace20 = replace19.replace("vtd-image", kubernetesConfiguration.getImageVtdNogpu());
|
|
|
|
- finalYaml = replace20.replace("vtd-command", kubernetesConfiguration.getCommandVtdCarsimNogpu());
|
|
|
|
- } else {
|
|
|
|
- throw new RuntimeException("是否使用 gpu:" + isChoiceGpu);
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- throw new RuntimeException("模型类型错误:" + modelType);
|
|
|
|
- }
|
|
|
|
- log.info("保存项目 " + projectId + " 的 yaml 文件:" + yamlPath);
|
|
|
|
- FileUtil.writeStringToLocalFile(finalYaml, yamlPath);
|
|
|
|
- String yamlRedisKey = "project:" + projectId + ":node:" + nodeName + ":yaml:" + podName;
|
|
|
|
- stringRedisTemplate.opsForValue().set(yamlRedisKey, yamlPath);
|
|
|
|
- return yamlRedisKey;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
//* -------------------------------- 逻辑 --------------------------------
|
|
//* -------------------------------- 逻辑 --------------------------------
|
|
|
|
|
|
|
|
|