|
@@ -312,7 +312,7 @@ public class ProjectConsumer {
|
|
|
if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
|
log.info("ProjectConsumer--cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
|
PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
- run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey(), userId);
|
|
|
+ run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
return;
|
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
|
clusterPO = clusterMapper.selectByUserId(userId);
|
|
@@ -339,7 +339,7 @@ public class ProjectConsumer {
|
|
|
int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
|
|
|
// 如果执行后的并行度总和小于最大节点数则执行,否则不执行
|
|
|
if (currentParallelismSum + parallelism <= simulationLicenseNumber) {
|
|
|
- run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey(), userId);
|
|
|
+ run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
} else {
|
|
|
wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
|
|
|
}
|
|
@@ -351,7 +351,7 @@ public class ProjectConsumer {
|
|
|
* @param projectRunningKey projectRunningKey
|
|
|
* @param projectWaitingKey projectWaitingKey
|
|
|
*/
|
|
|
- public void run(ProjectMessageDTO projectMessageDTO, String clusterId, String projectRunningKey, String projectWaitingKey, String userId) {
|
|
|
+ public void run(ProjectMessageDTO projectMessageDTO, String clusterId, String projectRunningKey, String projectWaitingKey) {
|
|
|
|
|
|
String projectId = projectMessageDTO.getProjectId();
|
|
|
int parallelism = projectMessageDTO.getParallelism(); // 期望并行度
|
|
@@ -362,7 +362,7 @@ public class ProjectConsumer {
|
|
|
log.info("ProjectConsumer--run 集群 " + clusterId + " 执行项目 " + projectId);
|
|
|
// 设置实际的并行度
|
|
|
projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism)); // 设置实际的并行度
|
|
|
- parseProject(projectMessageDTO, projectWaitingKey, projectRunningKey, userId);
|
|
|
+ parseProject(projectMessageDTO, projectRunningKey);
|
|
|
} else {
|
|
|
log.info("ProjectConsumer--cacheManualProject 服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
|
|
|
wait(projectWaitingKey, projectMessageDTO);
|
|
@@ -381,12 +381,10 @@ public class ProjectConsumer {
|
|
|
|
|
|
/**
|
|
|
* @param projectMessageDTO 初始接收到的项目启动信息
|
|
|
- * @param projectWaitingKey projectWaitingKey
|
|
|
* @param projectRunningKey projectRunningKey
|
|
|
*/
|
|
|
@SneakyThrows
|
|
|
- public void parseProject(ProjectMessageDTO projectMessageDTO, String projectWaitingKey, String projectRunningKey,
|
|
|
- String userId) {
|
|
|
+ public void parseProject(ProjectMessageDTO projectMessageDTO, String projectRunningKey) {
|
|
|
String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
|
ProjectPO projectPO = projectUtil.getProjectByProjectId(projectId);
|
|
|
log.info("项目 " + projectId + " 信息为:" + projectPO);
|