|
@@ -150,7 +150,7 @@ public class ProjectUtil {
|
|
final String yamlPathCacheKey = new ArrayList<>(yamlPathCacheKeySet).get(0);
|
|
final String yamlPathCacheKey = new ArrayList<>(yamlPathCacheKeySet).get(0);
|
|
final String absolutePath = stringRedisTemplate.opsForValue().get(yamlPathCacheKey);
|
|
final String absolutePath = stringRedisTemplate.opsForValue().get(yamlPathCacheKey);
|
|
// 修改 cpu 编号
|
|
// 修改 cpu 编号
|
|
- Optional.ofNullable(cpuOrderString).orElseThrow(() -> new RuntimeException("createNextPod2() pod " + lastPodName + " 缓存的 cpu 编号为空。"));
|
|
|
|
|
|
+ Optional.ofNullable(cpuOrderString).orElseThrow(() -> new RuntimeException("pod " + lastPodName + " 缓存的 cpu 编号为空。"));
|
|
final String read = FileUtil.read(absolutePath);
|
|
final String read = FileUtil.read(absolutePath);
|
|
final String replace = read.replace("cpu-order", "\"" + cpuOrderString + "\"");
|
|
final String replace = read.replace("cpu-order", "\"" + cpuOrderString + "\"");
|
|
FileUtil.writeStringToLocalFile(replace, absolutePath);
|
|
FileUtil.writeStringToLocalFile(replace, absolutePath);
|
|
@@ -237,70 +237,168 @@ public class ProjectUtil {
|
|
*
|
|
*
|
|
* @return 节点映射(节点名,并行度)
|
|
* @return 节点映射(节点名,并行度)
|
|
*/
|
|
*/
|
|
- public Map<String, Integer> getNodeMap() {
|
|
|
|
- List<NodeModel> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
|
|
|
|
- log.info("预设并行度的节点列表为:" + initialNodeList);
|
|
|
|
|
|
+ public Map<String, Integer> getNodeMap(String isChoiceGpu) {
|
|
Map<String, Integer> resultNodeMap = new HashMap<>(); // 用于执行的节点映射(节点名,并行度)
|
|
Map<String, Integer> resultNodeMap = new HashMap<>(); // 用于执行的节点映射(节点名,并行度)
|
|
- for (NodeModel kubernetesNodeSource : initialNodeList) {
|
|
|
|
- NodeModel kubernetesNodeCopy = kubernetesNodeSource.clone();
|
|
|
|
- String nodeName = kubernetesNodeCopy.getHostname();
|
|
|
|
- int maxParallelism = kubernetesNodeCopy.getParallelism();
|
|
|
|
- String restParallelismKey = "gpu-node:" + nodeName + ":parallelism";
|
|
|
|
- String restParallelismString = stringRedisTemplate.opsForValue().get(restParallelismKey);
|
|
|
|
- int restParallelism;
|
|
|
|
- if (restParallelismString == null) { // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
|
|
|
|
- restParallelism = maxParallelism;
|
|
|
|
- stringRedisTemplate.opsForValue().set(restParallelismKey, restParallelism + "");
|
|
|
|
- } else {
|
|
|
|
- restParallelism = Integer.parseInt(restParallelismString);
|
|
|
|
|
|
+ if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
|
|
|
|
+ List<NodeModel> initialNodeList = kubernetesConfiguration.getGpuNodeList();
|
|
|
|
+ log.info("预设并行度的GPU节点列表为:" + initialNodeList);
|
|
|
|
+ for (NodeModel kubernetesNodeSource : initialNodeList) {
|
|
|
|
+ NodeModel kubernetesNodeCopy = kubernetesNodeSource.clone();
|
|
|
|
+ String nodeName = kubernetesNodeCopy.getHostname();
|
|
|
|
+ int maxParallelism = kubernetesNodeCopy.getParallelism();
|
|
|
|
+ String restParallelismKey = "gpu-node:" + nodeName + ":parallelism";
|
|
|
|
+ String restParallelismString = stringRedisTemplate.opsForValue().get(restParallelismKey);
|
|
|
|
+ int restParallelism;
|
|
|
|
+ if (restParallelismString == null) { // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
|
|
|
|
+ restParallelism = maxParallelism;
|
|
|
|
+ stringRedisTemplate.opsForValue().set(restParallelismKey, String.valueOf(restParallelism));
|
|
|
|
+ } else {
|
|
|
|
+ restParallelism = Integer.parseInt(restParallelismString);
|
|
|
|
+ }
|
|
|
|
+ resultNodeMap.put(nodeName, restParallelism);
|
|
|
|
+ }
|
|
|
|
+ log.info("剩余并行度的GPU节点列表为:" + resultNodeMap);
|
|
|
|
+ } else if (DictConstants.USE_CPU.equals(isChoiceGpu)) {
|
|
|
|
+ List<NodeModel> initialNodeList = kubernetesConfiguration.getCpuNodeList();
|
|
|
|
+ log.info("预设并行度的CPU节点列表为:" + initialNodeList);
|
|
|
|
+ for (NodeModel kubernetesNodeSource : initialNodeList) {
|
|
|
|
+ NodeModel kubernetesNodeCopy = kubernetesNodeSource.clone();
|
|
|
|
+ String nodeName = kubernetesNodeCopy.getHostname();
|
|
|
|
+ int maxParallelism = kubernetesNodeCopy.getParallelism();
|
|
|
|
+ String restParallelismKey = "cpu-node:" + nodeName + ":parallelism";
|
|
|
|
+ String restParallelismString = stringRedisTemplate.opsForValue().get(restParallelismKey);
|
|
|
|
+ int restParallelism;
|
|
|
|
+ if (restParallelismString == null) { // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
|
|
|
|
+ restParallelism = maxParallelism;
|
|
|
|
+ stringRedisTemplate.opsForValue().set(restParallelismKey, String.valueOf(restParallelism));
|
|
|
|
+ } else {
|
|
|
|
+ restParallelism = Integer.parseInt(restParallelismString);
|
|
|
|
+ }
|
|
|
|
+ resultNodeMap.put(nodeName, restParallelism);
|
|
}
|
|
}
|
|
- resultNodeMap.put(nodeName, restParallelism);
|
|
|
|
|
|
+ log.info("剩余并行度的CPU节点列表为:" + resultNodeMap);
|
|
}
|
|
}
|
|
- log.info("剩余并行度的节点列表为:" + resultNodeMap);
|
|
|
|
return resultNodeMap;
|
|
return resultNodeMap;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
- * 获取集群剩余并行度
|
|
|
|
|
|
+ * 根据并行度获取用于执行的节点列表
|
|
|
|
+ * 根据剩余可用并行度降序排序
|
|
*
|
|
*
|
|
- * @return 集群剩余并行度
|
|
|
|
|
|
+ * @return 节点映射(节点名,并行度)
|
|
*/
|
|
*/
|
|
- public int getRestParallelism() {
|
|
|
|
- List<NodeModel> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
|
|
|
|
- // 遍历所有节点,获取还有剩余并行度的节点
|
|
|
|
- List<NodeModel> restNodeList = new ArrayList<>(); // 剩余并行度的节点列表
|
|
|
|
- for (NodeModel kubernetesNodeSource : initialNodeList) {
|
|
|
|
- NodeModel kubernetesNodeCopy = kubernetesNodeSource.clone();
|
|
|
|
- String nodeName = kubernetesNodeCopy.getHostname(); // 节点名称
|
|
|
|
- int maxParallelism = kubernetesNodeCopy.getParallelism();
|
|
|
|
- String restParallelismString = stringRedisTemplate.opsForValue().get("gpu-node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
|
|
|
|
- // -------------------------------- Comment --------------------------------
|
|
|
|
- int restParallelism;
|
|
|
|
- if (restParallelismString == null || Integer.parseInt(restParallelismString) > maxParallelism) { // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
|
|
|
|
- restParallelism = maxParallelism;
|
|
|
|
- stringRedisTemplate.opsForValue().set("gpu-node:" + nodeName + ":parallelism", restParallelism + "");
|
|
|
|
- } else {
|
|
|
|
- restParallelism = Integer.parseInt(restParallelismString);
|
|
|
|
- kubernetesNodeCopy.setParallelism(restParallelism);
|
|
|
|
|
|
+ public Map<String, Integer> getNodeMapToUse(String isChoiceGpu, int parallelism) {
|
|
|
|
+ List<NodeModel> initialNodeList; // 预设并行度的节点列表
|
|
|
|
+ if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
|
|
|
|
+ initialNodeList = kubernetesConfiguration.getGpuNodeList();
|
|
|
|
+ log.info("预设并行度的节点列表为:" + initialNodeList);
|
|
|
|
+ // 遍历所有节点,获取还有剩余并行度的节点
|
|
|
|
+ List<NodeModel> restNodeList = new ArrayList<>(); // 剩余并行度的节点列表
|
|
|
|
+ for (NodeModel kubernetesNodeSource : initialNodeList) {
|
|
|
|
+ NodeModel kubernetesNodeCopy = kubernetesNodeSource.clone();
|
|
|
|
+ String nodeName = kubernetesNodeCopy.getHostname(); // 节点名称
|
|
|
|
+ int maxParallelism = kubernetesNodeCopy.getParallelism();
|
|
|
|
+ String restParallelismString = stringRedisTemplate.opsForValue().get("gpu-node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
|
|
|
|
+ // -------------------------------- Comment --------------------------------
|
|
|
|
+ int restParallelism;
|
|
|
|
+ if (restParallelismString == null || Integer.parseInt(restParallelismString) > maxParallelism) { // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
|
|
|
|
+ restParallelism = maxParallelism;
|
|
|
|
+ stringRedisTemplate.opsForValue().set("gpu-node:" + nodeName + ":parallelism", String.valueOf(restParallelism));
|
|
|
|
+ } else {
|
|
|
|
+ restParallelism = Integer.parseInt(restParallelismString);
|
|
|
|
+ kubernetesNodeCopy.setParallelism(restParallelism);
|
|
|
|
+ }
|
|
|
|
+ if (restParallelism > 0) {
|
|
|
|
+ restNodeList.add(kubernetesNodeCopy);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- if (restParallelism > 0) {
|
|
|
|
- restNodeList.add(kubernetesNodeCopy);
|
|
|
|
|
|
+ log.info("剩余并行度的节点列表为:" + restNodeList);
|
|
|
|
+ Map<String, Integer> resultNodeMap = new HashMap<>(); // 用于执行的节点映射(节点名,并行度)
|
|
|
|
+ if (!CollectionUtil.isEmpty(restNodeList)) {
|
|
|
|
+ if (restNodeList.size() == 1) {
|
|
|
|
+ NodeModel tempNode = restNodeList.get(0);
|
|
|
|
+ String tempNodeName = tempNode.getHostname();
|
|
|
|
+ int tempParallelism = tempNode.getParallelism();
|
|
|
|
+ resultNodeMap.put(tempNodeName, Math.min(tempParallelism, parallelism));
|
|
|
|
+ }
|
|
|
|
+ if (restNodeList.size() > 1) {
|
|
|
|
+ for (int i = 0; i < parallelism; i++) {
|
|
|
|
+ // 每次降序排序都取剩余并行度最大的一个。
|
|
|
|
+ restNodeList.sort((o1, o2) -> o2.getParallelism() - o1.getParallelism());
|
|
|
|
+ NodeModel tempNode = restNodeList.get(0);
|
|
|
|
+ String tempNodeName = tempNode.getHostname();
|
|
|
|
+ int tempParallelism = tempNode.getParallelism();
|
|
|
|
+ if (tempParallelism > 0) {
|
|
|
|
+ tempNode.setParallelism(tempParallelism - 1);
|
|
|
|
+ CollectionUtil.addValueToMap(resultNodeMap, 1, tempNodeName);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ log.info("即将使用节点的并行度为:" + resultNodeMap);
|
|
|
|
+ return resultNodeMap;
|
|
|
|
+ } else if (DictConstants.USE_CPU.equals(isChoiceGpu)) {
|
|
|
|
+ initialNodeList = kubernetesConfiguration.getCpuNodeList();
|
|
|
|
+ log.info("预设并行度的节点列表为:" + initialNodeList);
|
|
|
|
+ // 遍历所有节点,获取还有剩余并行度的节点
|
|
|
|
+ List<NodeModel> restNodeList = new ArrayList<>(); // 剩余并行度的节点列表
|
|
|
|
+ for (NodeModel kubernetesNodeSource : initialNodeList) {
|
|
|
|
+ NodeModel kubernetesNodeCopy = kubernetesNodeSource.clone();
|
|
|
|
+ String nodeName = kubernetesNodeCopy.getHostname(); // 节点名称
|
|
|
|
+ int maxParallelism = kubernetesNodeCopy.getParallelism();
|
|
|
|
+ String restParallelismString = stringRedisTemplate.opsForValue().get("cpu-node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
|
|
|
|
+ // -------------------------------- Comment --------------------------------
|
|
|
|
+ int restParallelism;
|
|
|
|
+ if (restParallelismString == null || Integer.parseInt(restParallelismString) > maxParallelism) { // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
|
|
|
|
+ restParallelism = maxParallelism;
|
|
|
|
+ stringRedisTemplate.opsForValue().set("cpu-node:" + nodeName + ":parallelism", String.valueOf(restParallelism));
|
|
|
|
+ } else {
|
|
|
|
+ restParallelism = Integer.parseInt(restParallelismString);
|
|
|
|
+ kubernetesNodeCopy.setParallelism(restParallelism);
|
|
|
|
+ }
|
|
|
|
+ if (restParallelism > 0) {
|
|
|
|
+ restNodeList.add(kubernetesNodeCopy);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ log.info("剩余并行度的节点列表为:" + restNodeList);
|
|
|
|
+ Map<String, Integer> resultNodeMap = new HashMap<>(); // 用于执行的节点映射(节点名,并行度)
|
|
|
|
+ if (!CollectionUtil.isEmpty(restNodeList)) {
|
|
|
|
+ if (restNodeList.size() == 1) {
|
|
|
|
+ NodeModel tempNode = restNodeList.get(0);
|
|
|
|
+ String tempNodeName = tempNode.getHostname();
|
|
|
|
+ int tempParallelism = tempNode.getParallelism();
|
|
|
|
+ resultNodeMap.put(tempNodeName, Math.min(tempParallelism, parallelism));
|
|
|
|
+ }
|
|
|
|
+ if (restNodeList.size() > 1) {
|
|
|
|
+ for (int i = 0; i < parallelism; i++) {
|
|
|
|
+ // 每次降序排序都取剩余并行度最大的一个。
|
|
|
|
+ restNodeList.sort((o1, o2) -> o2.getParallelism() - o1.getParallelism());
|
|
|
|
+ NodeModel tempNode = restNodeList.get(0);
|
|
|
|
+ String tempNodeName = tempNode.getHostname();
|
|
|
|
+ int tempParallelism = tempNode.getParallelism();
|
|
|
|
+ if (tempParallelism > 0) {
|
|
|
|
+ tempNode.setParallelism(tempParallelism - 1);
|
|
|
|
+ CollectionUtil.addValueToMap(resultNodeMap, 1, tempNodeName);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ log.info("即将使用节点的并行度为:" + resultNodeMap);
|
|
|
|
+ return resultNodeMap;
|
|
|
|
+ } else {
|
|
|
|
+ throw new RuntimeException("未知是否使用 GPU:" + isChoiceGpu);
|
|
}
|
|
}
|
|
- log.info(" 集群剩余并行度为:" + restNodeList);
|
|
|
|
- return restNodeList.size() == 0 ? 0 : restNodeList.stream().mapToInt(NodeModel::getParallelism).sum();
|
|
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * 根据并行度获取用于执行的节点列表
|
|
|
|
- * 根据剩余可用并行度降序排序
|
|
|
|
|
|
+ * 获取集群剩余并行度
|
|
*
|
|
*
|
|
- * @return 节点映射(节点名,并行度)
|
|
|
|
|
|
+ * @return 集群剩余并行度
|
|
*/
|
|
*/
|
|
- public Map<String, Integer> getNodeMapToUse(int parallelism) {
|
|
|
|
|
|
+ public int getRestParallelism() {
|
|
List<NodeModel> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
|
|
List<NodeModel> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
|
|
- log.info("预设并行度的节点列表为:" + initialNodeList);
|
|
|
|
// 遍历所有节点,获取还有剩余并行度的节点
|
|
// 遍历所有节点,获取还有剩余并行度的节点
|
|
List<NodeModel> restNodeList = new ArrayList<>(); // 剩余并行度的节点列表
|
|
List<NodeModel> restNodeList = new ArrayList<>(); // 剩余并行度的节点列表
|
|
for (NodeModel kubernetesNodeSource : initialNodeList) {
|
|
for (NodeModel kubernetesNodeSource : initialNodeList) {
|
|
@@ -321,31 +419,8 @@ public class ProjectUtil {
|
|
restNodeList.add(kubernetesNodeCopy);
|
|
restNodeList.add(kubernetesNodeCopy);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- log.info("剩余并行度的节点列表为:" + restNodeList);
|
|
|
|
- Map<String, Integer> resultNodeMap = new HashMap<>(); // 用于执行的节点映射(节点名,并行度)
|
|
|
|
- if (!CollectionUtil.isEmpty(restNodeList)) {
|
|
|
|
- if (restNodeList.size() == 1) {
|
|
|
|
- NodeModel tempNode = restNodeList.get(0);
|
|
|
|
- String tempNodeName = tempNode.getHostname();
|
|
|
|
- int tempParallelism = tempNode.getParallelism();
|
|
|
|
- resultNodeMap.put(tempNodeName, Math.min(tempParallelism, parallelism));
|
|
|
|
- }
|
|
|
|
- if (restNodeList.size() > 1) {
|
|
|
|
- for (int i = 0; i < parallelism; i++) {
|
|
|
|
- // 每次降序排序都取剩余并行度最大的一个。
|
|
|
|
- restNodeList.sort((o1, o2) -> o2.getParallelism() - o1.getParallelism());
|
|
|
|
- NodeModel tempNode = restNodeList.get(0);
|
|
|
|
- String tempNodeName = tempNode.getHostname();
|
|
|
|
- int tempParallelism = tempNode.getParallelism();
|
|
|
|
- if (tempParallelism > 0) {
|
|
|
|
- tempNode.setParallelism(tempParallelism - 1);
|
|
|
|
- CollectionUtil.addValueToMap(resultNodeMap, 1, tempNodeName);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- log.info("即将使用节点的并行度为:" + resultNodeMap);
|
|
|
|
- return resultNodeMap;
|
|
|
|
|
|
+ log.info(" 集群剩余并行度为:" + restNodeList);
|
|
|
|
+ return restNodeList.size() == 0 ? 0 : restNodeList.stream().mapToInt(NodeModel::getParallelism).sum();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -567,4 +642,14 @@ public class ProjectUtil {
|
|
throw new RuntimeException("未知项目类型:" + projectType);
|
|
throw new RuntimeException("未知项目类型:" + projectType);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ public String getUserIdByProjectIdAndProjectType(String projectId, String projectType) {
|
|
|
|
+ if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
|
|
+ return manualProjectMapper.selectCreateUserById(projectId);
|
|
|
|
+ } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
|
|
+ return autoSubProjectMapper.selectCreateUserById(projectId);
|
|
|
|
+ } else {
|
|
|
|
+ throw new RuntimeException("未知项目类型:" + projectType);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|