|
@@ -370,7 +370,7 @@ public class ProjectUtil {
|
|
clusterId = clusterMapper.selectByUserId(parentUserId).getId();
|
|
clusterId = clusterMapper.selectByUserId(parentUserId).getId();
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- throw new RuntimeException("ProjectUtil--getRedisPrefixByUserIdAndProjectIdAndTaksId 未知账户类型,无法获取集群信息!");
|
|
|
|
|
|
+ throw new RuntimeException("未知账户类型,无法获取集群信息。");
|
|
}
|
|
}
|
|
String clusterPrefix = "cluster:" + clusterId;
|
|
String clusterPrefix = "cluster:" + clusterId;
|
|
String clusterRunningPrefix = clusterPrefix + ":running";
|
|
String clusterRunningPrefix = clusterPrefix + ":running";
|
|
@@ -393,44 +393,6 @@ public class ProjectUtil {
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
- public PrefixTO getRedisPrefixByUserIdAndProjectId(String userId, String projectId) {
|
|
|
|
- //3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
|
|
- UserPO userPO = userMapper.selectById(userId);
|
|
|
|
- String roleCode = userPO.getRoleCode();
|
|
|
|
- String useType = userPO.getUseType();
|
|
|
|
- String clusterId;
|
|
|
|
- if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
|
|
- clusterId = DictConstants.SYSTEM_CLUSTER_ID;
|
|
|
|
- } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
|
|
- clusterId = clusterMapper.selectByUserId(userId).getId();
|
|
|
|
- } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
|
|
|
|
- if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) { //3-3 普通子账户,根据自己的独占节点排队
|
|
|
|
- clusterId = clusterMapper.selectByUserId(userId).getId();
|
|
|
|
- } else { //3-4 共享子账户,根据父账户的共享节点排队
|
|
|
|
- String parentUserId = userPO.getCreateUserId();
|
|
|
|
- clusterId = clusterMapper.selectByUserId(parentUserId).getId();
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- throw new RuntimeException("ProjectUtil--getRedisPrefixByUserIdAndProjectIdAndTaksId 未知账户类型,无法获取集群信息!");
|
|
|
|
- }
|
|
|
|
- String clusterPrefix = "cluster:" + clusterId;
|
|
|
|
- String clusterRunningPrefix = clusterPrefix + ":running";
|
|
|
|
- String clusterWaitingPrefix = clusterPrefix + ":waiting";
|
|
|
|
- String projectRunningKey = clusterRunningPrefix + ":" + projectId;
|
|
|
|
- String projectWaitingKey = clusterWaitingPrefix + ":" + projectId;
|
|
|
|
- String projectCheckKey = projectRunningKey + ":check";
|
|
|
|
-
|
|
|
|
- return PrefixTO.builder()
|
|
|
|
- .clusterPrefix(clusterPrefix)
|
|
|
|
- .clusterRunningPrefix(clusterRunningPrefix)
|
|
|
|
- .clusterWaitingPrefix(clusterWaitingPrefix)
|
|
|
|
- .projectRunningKey(projectRunningKey)
|
|
|
|
- .projectWaitingKey(projectWaitingKey)
|
|
|
|
- .projectCheckKey(projectCheckKey)
|
|
|
|
- .build();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
public PrefixTO getRedisPrefixByClusterIdAndProjectId(String clusterId, String projectId) {
|
|
public PrefixTO getRedisPrefixByClusterIdAndProjectId(String clusterId, String projectId) {
|
|
String clusterPrefix = "cluster:" + clusterId;
|
|
String clusterPrefix = "cluster:" + clusterId;
|
|
String clusterRunningPrefix = clusterPrefix + ":running";
|
|
String clusterRunningPrefix = clusterPrefix + ":running";
|
|
@@ -461,10 +423,6 @@ public class ProjectUtil {
|
|
return clusterRunningKeySet.stream().filter(key -> StringUtil.countSubString(key, ":") == 3).collect(Collectors.toList());
|
|
return clusterRunningKeySet.stream().filter(key -> StringUtil.countSubString(key, ":") == 3).collect(Collectors.toList());
|
|
}
|
|
}
|
|
|
|
|
|
- public void sendMessage(String topic, String message) {
|
|
|
|
- kafkaTemplate.send(topic, message);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
|
|
public PrefixTO getRedisPrefixByProjectIdAndProjectType(String projectId, String projectType) {
|
|
public PrefixTO getRedisPrefixByProjectIdAndProjectType(String projectId, String projectType) {
|
|
String userId;
|
|
String userId;
|
|
@@ -473,7 +431,7 @@ public class ProjectUtil {
|
|
} else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
} else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
} else {
|
|
} else {
|
|
- throw new RuntimeException("getRedisPrefixByProjectIdAndProjectType() 未知的项目类型!");
|
|
|
|
|
|
+ throw new RuntimeException("未知的项目类型。");
|
|
}
|
|
}
|
|
|
|
|
|
//3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
//3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
@@ -493,7 +451,7 @@ public class ProjectUtil {
|
|
clusterId = clusterMapper.selectByUserId(parentUserId).getId();
|
|
clusterId = clusterMapper.selectByUserId(parentUserId).getId();
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- throw new RuntimeException("ProjectUtil--getRedisPrefixByUserIdAndProjectIdAndTaksId 未知账户类型,无法获取集群信息!");
|
|
|
|
|
|
+ throw new RuntimeException("未知账户类型,无法获取集群信息!");
|
|
}
|
|
}
|
|
String clusterPrefix = "cluster:" + clusterId;
|
|
String clusterPrefix = "cluster:" + clusterId;
|
|
String clusterRunningPrefix = clusterPrefix + ":running";
|
|
String clusterRunningPrefix = clusterPrefix + ":running";
|
|
@@ -522,14 +480,23 @@ public class ProjectUtil {
|
|
String key = "node:" + nodeName + ":parallelism";
|
|
String key = "node:" + nodeName + ":parallelism";
|
|
String parallelismString = stringRedisTemplate.opsForValue().get(key);
|
|
String parallelismString = stringRedisTemplate.opsForValue().get(key);
|
|
if (StringUtil.isEmpty(parallelismString)) {
|
|
if (StringUtil.isEmpty(parallelismString)) {
|
|
- throw new RuntimeException("addOneParallelismToNode() redisKey " + key + " 为空。");
|
|
|
|
|
|
+ throw new RuntimeException("redisKey " + key + " 为空。");
|
|
}
|
|
}
|
|
final int parallelismBefore = Integer.parseInt(parallelismString);
|
|
final int parallelismBefore = Integer.parseInt(parallelismString);
|
|
final int parallelismAfter = parallelismBefore + 1;
|
|
final int parallelismAfter = parallelismBefore + 1;
|
|
stringRedisTemplate.opsForValue().set(key, parallelismAfter + "");
|
|
stringRedisTemplate.opsForValue().set(key, parallelismAfter + "");
|
|
- log.info("addOneParallelismToNode() 归还节点 " + nodeName + " 并行度:" + parallelismBefore + " --> " + parallelismAfter);
|
|
|
|
|
|
+ log.info("归还节点 " + nodeName + " 并行度:" + parallelismBefore + " --> " + parallelismAfter);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public void parallelismAddOne(String nodeName){
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void parallelismReduceOne(String nodeName){
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
public List<String> getWaitingProjectMessageKeys() {
|
|
public List<String> getWaitingProjectMessageKeys() {
|
|
final Set<String> keys = stringRedisTemplate.keys("*");
|
|
final Set<String> keys = stringRedisTemplate.keys("*");
|
|
if (CollectionUtil.isEmpty(keys)) {
|
|
if (CollectionUtil.isEmpty(keys)) {
|