|
@@ -142,11 +142,12 @@ public class ProjectUtil {
|
|
*
|
|
*
|
|
* @return 节点映射(节点名,并行度)
|
|
* @return 节点映射(节点名,并行度)
|
|
*/
|
|
*/
|
|
- public Map<String, Integer> getNodeMapToUse(long parallelism) {
|
|
|
|
|
|
+ public Map<String, Integer> getNodeMapToUse(int parallelism) {
|
|
List<KubernetesNodeTO> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
|
|
List<KubernetesNodeTO> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
|
|
log.info("ProjectUtil--getNodeMapToUse 预设并行度的节点列表为:" + initialNodeList);
|
|
log.info("ProjectUtil--getNodeMapToUse 预设并行度的节点列表为:" + initialNodeList);
|
|
// 遍历所有节点,获取还有剩余并行度的节点
|
|
// 遍历所有节点,获取还有剩余并行度的节点
|
|
List<KubernetesNodeTO> restNodeList = new ArrayList<>(); // 剩余并行度的节点列表
|
|
List<KubernetesNodeTO> restNodeList = new ArrayList<>(); // 剩余并行度的节点列表
|
|
|
|
+ int restParallelismSum = 0;
|
|
for (KubernetesNodeTO kubernetesNodeSource : initialNodeList) {
|
|
for (KubernetesNodeTO kubernetesNodeSource : initialNodeList) {
|
|
KubernetesNodeTO kubernetesNodeCopy = kubernetesNodeSource.clone();
|
|
KubernetesNodeTO kubernetesNodeCopy = kubernetesNodeSource.clone();
|
|
String nodeName = kubernetesNodeCopy.getName(); // 节点名称
|
|
String nodeName = kubernetesNodeCopy.getName(); // 节点名称
|
|
@@ -163,21 +164,31 @@ public class ProjectUtil {
|
|
kubernetesNodeCopy.setMaxParallelism(restParallelism);
|
|
kubernetesNodeCopy.setMaxParallelism(restParallelism);
|
|
}
|
|
}
|
|
if (restParallelism > 0) {
|
|
if (restParallelism > 0) {
|
|
|
|
+ restParallelismSum += restParallelism;
|
|
restNodeList.add(kubernetesNodeCopy);
|
|
restNodeList.add(kubernetesNodeCopy);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
log.info("ProjectUtil--getNodeMapToUse 剩余并行度的节点列表为:" + restNodeList);
|
|
log.info("ProjectUtil--getNodeMapToUse 剩余并行度的节点列表为:" + restNodeList);
|
|
Map<String, Integer> resultNodeMap = new HashMap<>(); // 用于执行的节点映射(节点名,并行度)
|
|
Map<String, Integer> resultNodeMap = new HashMap<>(); // 用于执行的节点映射(节点名,并行度)
|
|
if (!CollectionUtil.isEmpty(restNodeList)) {
|
|
if (!CollectionUtil.isEmpty(restNodeList)) {
|
|
- for (int i = 0; i < parallelism; i++) {
|
|
|
|
- // 每次降序排序都取剩余并行度最大的一个。
|
|
|
|
- restNodeList.sort((o1, o2) -> o2.getMaxParallelism() - o1.getMaxParallelism());
|
|
|
|
|
|
+
|
|
|
|
+ if (restNodeList.size() == 1) {
|
|
KubernetesNodeTO tempNode = restNodeList.get(0);
|
|
KubernetesNodeTO tempNode = restNodeList.get(0);
|
|
String tempNodeName = tempNode.getName();
|
|
String tempNodeName = tempNode.getName();
|
|
int tempParallelism = tempNode.getMaxParallelism();
|
|
int tempParallelism = tempNode.getMaxParallelism();
|
|
- if (tempParallelism > 0) {
|
|
|
|
- tempNode.setMaxParallelism(tempParallelism - 1);
|
|
|
|
- resultNodeMap.put(tempNodeName, resultNodeMap.get(tempNodeName) + 1);
|
|
|
|
|
|
+ resultNodeMap.put(tempNodeName, Math.min(tempParallelism, parallelism));
|
|
|
|
+ }
|
|
|
|
+ if (restNodeList.size() > 1) {
|
|
|
|
+ for (int i = 0; i < parallelism; i++) {
|
|
|
|
+ // 每次降序排序都取剩余并行度最大的一个。
|
|
|
|
+ restNodeList.sort((o1, o2) -> o2.getMaxParallelism() - o1.getMaxParallelism());
|
|
|
|
+ KubernetesNodeTO tempNode = restNodeList.get(0);
|
|
|
|
+ String tempNodeName = tempNode.getName();
|
|
|
|
+ int tempParallelism = tempNode.getMaxParallelism();
|
|
|
|
+ if (tempParallelism > 0) {
|
|
|
|
+ tempNode.setMaxParallelism(tempParallelism - 1);
|
|
|
|
+ resultNodeMap.put(tempNodeName, resultNodeMap.get(tempNodeName) + 1);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|