|
@@ -6,7 +6,9 @@ import api.common.pojo.dto.ProjectMessageDTO;
|
|
|
import api.common.util.*;
|
|
|
import com.css.simulation.resource.scheduler.mapper.*;
|
|
|
import com.css.simulation.resource.scheduler.pojo.po.*;
|
|
|
+import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
|
|
|
import com.css.simulation.resource.scheduler.service.ManualProjectService;
|
|
|
+import com.css.simulation.resource.scheduler.util.ProjectUtil;
|
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import lombok.SneakyThrows;
|
|
@@ -61,6 +63,8 @@ public class ProjectConsumer {
|
|
|
ClusterMapper clusterMapper;
|
|
|
@Autowired
|
|
|
ManualProjectService manualProjectService;
|
|
|
+ @Autowired
|
|
|
+ ProjectUtil projectUtil;
|
|
|
// @Autowired
|
|
|
// ApiClient apiClient;
|
|
|
@Value("${scheduler.manual-project.topic}")
|
|
@@ -114,16 +118,9 @@ public class ProjectConsumer {
|
|
|
String roleCode = userPO.getRoleCode();
|
|
|
String useType = userPO.getUseType();
|
|
|
ClusterPO clusterPO;
|
|
|
- String clusterPrefix;
|
|
|
- String clusterRunningPrefix;
|
|
|
- String clusterWaitingPrefix;
|
|
|
- String projectRunningKey;
|
|
|
- String projectWaitingKey;
|
|
|
if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
|
- clusterPrefix = "cluster:" + DictConstants.SYSTEM_CLUSTER_ID;
|
|
|
- clusterRunningPrefix = clusterPrefix + ":running";
|
|
|
- projectRunningKey = clusterRunningPrefix + ":" + projectId;
|
|
|
- run(DictConstants.SYSTEM_CLUSTER_ID, projectId, projectRunningKey, projectJson);
|
|
|
+ PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
+ run(DictConstants.SYSTEM_CLUSTER_ID, projectId, redisPrefix.getProjectRunningKey(), projectJson);
|
|
|
return;
|
|
|
} else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
|
clusterPO = clusterMapper.selectByUserId(userId);
|
|
@@ -142,14 +139,10 @@ public class ProjectConsumer {
|
|
|
String clusterId = clusterPO.getId();
|
|
|
int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
|
|
|
// 获取该集群中正在运行的项目,如果没有则立即执行
|
|
|
- clusterPrefix = "cluster:" + clusterId;
|
|
|
- clusterRunningPrefix = clusterPrefix + ":running";
|
|
|
- clusterWaitingPrefix = clusterPrefix + ":waiting";
|
|
|
- projectRunningKey = clusterRunningPrefix + ":" + projectId;
|
|
|
- projectWaitingKey = clusterWaitingPrefix + ":" + projectId;
|
|
|
- Set<String> runningProjectSet = redisTemplate.keys(clusterRunningPrefix + "*");
|
|
|
+ PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
|
|
|
+ Set<String> runningProjectSet = redisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
|
|
|
if (CollectionUtil.isEmpty(runningProjectSet)) {
|
|
|
- run(clusterId, projectId, projectRunningKey, projectJson);
|
|
|
+ run(clusterId, projectId, redisPrefix.getProjectRunningKey(), projectJson);
|
|
|
return;
|
|
|
}
|
|
|
// 计算正在运行的项目的并行度总和
|
|
@@ -161,9 +154,9 @@ public class ProjectConsumer {
|
|
|
}
|
|
|
// 如果执行后的并行度总和小于最大节点数则执行,否则不执行
|
|
|
if (parallelismSum + parallelism <= simulationLicenseNumber) {
|
|
|
- run(clusterId, projectId, projectRunningKey, projectJson);
|
|
|
+ run(clusterId, projectId, redisPrefix.getProjectRunningKey(), projectJson);
|
|
|
} else {
|
|
|
- wait(clusterId, projectId, projectWaitingKey, projectJson);
|
|
|
+ wait(clusterId, projectId, redisPrefix.getProjectWaitingKey(), projectJson);
|
|
|
}
|
|
|
}
|
|
|
|