|
@@ -400,38 +400,38 @@ public class ProjectConsumer {
|
|
} else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
} else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
} else {
|
|
} else {
|
|
- log.error("cacheManualProject 项目类型错误:" + initialProjectJson);
|
|
|
|
|
|
+ log.error("cacheManualProject() 项目类型错误:" + initialProjectJson);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
if (StringUtil.isEmpty(userId)) {
|
|
if (StringUtil.isEmpty(userId)) {
|
|
- log.error("cacheManualProject 未查询到项目创建人:" + initialProjectJson);
|
|
|
|
|
|
+ log.error("cacheManualProject() 未查询到项目创建人:" + initialProjectJson);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
//3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
//3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
UserPO userPO = userMapper.selectById(userId);
|
|
UserPO userPO = userMapper.selectById(userId);
|
|
- log.info("cacheManualProject 项目 " + projectId + " 的创建人为:" + userPO);
|
|
|
|
|
|
+ log.info("cacheManualProject() 项目 " + projectId + " 的创建人为:" + userPO);
|
|
String roleCode = userPO.getRoleCode();
|
|
String roleCode = userPO.getRoleCode();
|
|
String useType = userPO.getUseType();
|
|
String useType = userPO.getUseType();
|
|
ClusterPO clusterPO;
|
|
ClusterPO clusterPO;
|
|
if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
- log.info("cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
|
|
|
|
+ log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
return;
|
|
return;
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
clusterPO = clusterMapper.selectByUserId(userId);
|
|
clusterPO = clusterMapper.selectByUserId(userId);
|
|
- log.info("cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterPO);
|
|
|
|
|
|
+ log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterPO);
|
|
} else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
|
|
} else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
|
|
if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) { //3-3 普通子账户,根据自己的独占节点排队
|
|
if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) { //3-3 普通子账户,根据自己的独占节点排队
|
|
clusterPO = clusterMapper.selectByUserId(userId);
|
|
clusterPO = clusterMapper.selectByUserId(userId);
|
|
- log.info("cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为普通独占子账户(自己的集群),集群为:" + clusterPO);
|
|
|
|
|
|
+ log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通独占子账户(自己的集群),集群为:" + clusterPO);
|
|
} else { //3-4 共享子账户,根据父账户的共享节点排队
|
|
} else { //3-4 共享子账户,根据父账户的共享节点排队
|
|
String parentUserId = userPO.getCreateUserId();
|
|
String parentUserId = userPO.getCreateUserId();
|
|
clusterPO = clusterMapper.selectByUserId(parentUserId);
|
|
clusterPO = clusterMapper.selectByUserId(parentUserId);
|
|
- log.info("cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为普通共享子账户(父账户的集群),集群为:" + clusterPO);
|
|
|
|
|
|
+ log.info("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为普通共享子账户(父账户的集群),集群为:" + clusterPO);
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- log.error("cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
|
|
|
|
|
|
+ log.error("cacheManualProject() 项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
// 获取拥有的节点数量,即仿真软件证书数量
|
|
// 获取拥有的节点数量,即仿真软件证书数量
|
|
@@ -463,12 +463,12 @@ public class ProjectConsumer {
|
|
int restParallelism = projectUtil.getRestParallelism();
|
|
int restParallelism = projectUtil.getRestParallelism();
|
|
//2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
|
|
//2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
|
|
if (restParallelism > 0L) {
|
|
if (restParallelism > 0L) {
|
|
- log.info("run 集群 " + clusterId + " 执行项目 " + projectId);
|
|
|
|
|
|
+ log.info("run() 集群 " + clusterId + " 执行项目 " + projectId);
|
|
// 设置实际的并行度
|
|
// 设置实际的并行度
|
|
projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism)); // 设置实际的并行度
|
|
projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism)); // 设置实际的并行度
|
|
parseProject(projectMessageDTO, projectRunningKey);
|
|
parseProject(projectMessageDTO, projectRunningKey);
|
|
} else {
|
|
} else {
|
|
- log.info("cacheManualProject 服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
|
|
|
|
|
|
+ log.info("run() 服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
|
|
wait(projectWaitingKey, projectMessageDTO);
|
|
wait(projectWaitingKey, projectMessageDTO);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -493,7 +493,7 @@ public class ProjectConsumer {
|
|
String modelType = projectMessageDTO.getModelType();
|
|
String modelType = projectMessageDTO.getModelType();
|
|
String vehicleConfigId = projectMessageDTO.getVehicleConfigId();
|
|
String vehicleConfigId = projectMessageDTO.getVehicleConfigId();
|
|
ProjectPO projectPO = projectUtil.getProjectByProjectId(projectId);
|
|
ProjectPO projectPO = projectUtil.getProjectByProjectId(projectId);
|
|
- log.info("项目 " + projectId + " 信息为:" + projectPO);
|
|
|
|
|
|
+ log.info("parseProject() 项目 " + projectId + " 信息为:" + projectPO);
|
|
String isChoiceGpu = projectPO.getIsChoiceGpu();
|
|
String isChoiceGpu = projectPO.getIsChoiceGpu();
|
|
// 项目类型
|
|
// 项目类型
|
|
int currentParallelism = projectMessageDTO.getCurrentParallelism(); // 当前并行度
|
|
int currentParallelism = projectMessageDTO.getCurrentParallelism(); // 当前并行度
|
|
@@ -529,13 +529,13 @@ public class ProjectConsumer {
|
|
// 重新设置实际使用的并行度并保存到 redis
|
|
// 重新设置实际使用的并行度并保存到 redis
|
|
int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
|
|
int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
|
|
projectMessageDTO.setCurrentParallelism(realCurrentParallelism);
|
|
projectMessageDTO.setCurrentParallelism(realCurrentParallelism);
|
|
- log.info("ProjectConsume--parseProject 项目 " + projectId + " 运行在:" + nodeMap);
|
|
|
|
|
|
+ log.info("parseProject() 项目 " + projectId + " 运行在:" + nodeMap);
|
|
stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
|
|
stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
|
|
//* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
|
|
//* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
|
|
String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
|
|
String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
|
|
// -------------------------------- 4 发送任务消息 --------------------------------
|
|
// -------------------------------- 4 发送任务消息 --------------------------------
|
|
List<NodeTO> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
|
|
List<NodeTO> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
|
|
- log.info("项目 " + projectId + " 运行在:" + nodeListToCount);
|
|
|
|
|
|
+ log.info("parseProject() 项目 " + projectId + " 运行在:" + nodeListToCount);
|
|
int messageNumber = 0;
|
|
int messageNumber = 0;
|
|
ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1); // 创建主题
|
|
ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1); // 创建主题
|
|
TimeUnit.SECONDS.sleep(6);
|
|
TimeUnit.SECONDS.sleep(6);
|
|
@@ -557,7 +557,7 @@ public class ProjectConsumer {
|
|
String topic = recordMetadata.topic(); // 消息发送到的topic
|
|
String topic = recordMetadata.topic(); // 消息发送到的topic
|
|
int partition = recordMetadata.partition(); // 消息发送到的分区
|
|
int partition = recordMetadata.partition(); // 消息发送到的分区
|
|
long offset = recordMetadata.offset(); // 消息在分区内的offset
|
|
long offset = recordMetadata.offset(); // 消息在分区内的offset
|
|
- log.info("------- ProjectConsumer 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:"
|
|
|
|
|
|
+ log.info("parseProject() 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:"
|
|
+ partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
|
|
+ partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
|
|
//4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
|
|
//4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
|
|
// 选一个 count 最少的 node
|
|
// 选一个 count 最少的 node
|