|
@@ -411,7 +411,7 @@ public class ProjectService {
|
|
|
} else {
|
|
|
expandParallelism = waitingParallelism;
|
|
|
}
|
|
|
- expand(projectId, isChoiceGpu, parallelism, expandParallelism, expandParallelism == waitingParallelism);
|
|
|
+ expand(clusterUserId, modelType, projectId, isChoiceGpu, parallelism, expandParallelism, expandParallelism == waitingParallelism);
|
|
|
}
|
|
|
} else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
|
|
|
usingDynamicLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_DYNAMIC);
|
|
@@ -426,7 +426,7 @@ public class ProjectService {
|
|
|
} else {
|
|
|
expandParallelism = waitingParallelism;
|
|
|
}
|
|
|
- expand(projectId, isChoiceGpu, parallelism, expandParallelism, expandParallelism == waitingParallelism);
|
|
|
+ expand(clusterUserId, modelType, projectId, isChoiceGpu, parallelism, expandParallelism, expandParallelism == waitingParallelism);
|
|
|
}
|
|
|
} else {
|
|
|
throw new RuntimeException("未知模型类型:" + modelType);
|
|
@@ -505,7 +505,7 @@ public class ProjectService {
|
|
|
List<NodeEntity> nodeListToCount = projectDomainService.getNodeListToCount(nodeMap);
|
|
|
int messageNumber = 0;
|
|
|
KafkaUtil.createTopic(kafkaAdminClient, projectId, finalParallelism, (short) 1); // 创建主题
|
|
|
- TimeUnit.SECONDS.sleep(10);
|
|
|
+ TimeUnit.SECONDS.sleep(3);
|
|
|
// 需要即时启动的任务(并行度的大小)
|
|
|
ArrayList<String> yamlToRunRedisKeyList = new ArrayList<>();
|
|
|
for (String taskJsonPath : taskJsonList) {
|
|
@@ -557,7 +557,7 @@ public class ProjectService {
|
|
|
}
|
|
|
messageNumber++;
|
|
|
}
|
|
|
- TimeUnit.SECONDS.sleep(10);
|
|
|
+ TimeUnit.SECONDS.sleep(3);
|
|
|
log.debug("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
|
|
|
for (String redisKey : yamlToRunRedisKeyList) {
|
|
|
projectDomainService.createPodBegin(projectId, redisKey);
|
|
@@ -570,8 +570,11 @@ public class ProjectService {
|
|
|
|
|
|
|
|
|
@Synchronized
|
|
|
- public void expand(String projectId, String isChoiceGpu, int totalParallelism, int expandParallelism, boolean isDone) {
|
|
|
+ public void expand(String clusterUserId, String modelType, String projectId, String isChoiceGpu, int totalParallelism, int expandParallelism, boolean isDone) {
|
|
|
log.info("扩充项目 {} {} 个并行度", projectId, expandParallelism);
|
|
|
+ if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)) {
|
|
|
+ projectDomainService.useLicense(clusterUserId, modelType, expandParallelism);
|
|
|
+ }
|
|
|
//1 获取剩余并行度和即将使用的各node的并行度
|
|
|
Map<String, Integer> remainderNodeMap = projectDomainService.getRemainderNodeMap(isChoiceGpu);
|
|
|
log.info("剩余并行度为:" + remainderNodeMap);
|
|
@@ -595,7 +598,8 @@ public class ProjectService {
|
|
|
// 修改全部yaml
|
|
|
nodeMapToUse.forEach((nodeNameAfter, parallelismToUse) -> {
|
|
|
log.info("修改yaml执行节点 {} -> {}:{}", nodeNameBefore, nodeNameAfter, yamlPathCacheKeySetGroupByNodeName);
|
|
|
- int shareNum = yamlCount / totalParallelism * parallelismToUse;
|
|
|
+ int shareNum = (int) Math.ceil(yamlCount * 1.0 / totalParallelism * parallelismToUse);
|
|
|
+ log.info("需要分享的yaml个数为:{}", shareNum);
|
|
|
for (int i = 0; i < shareNum; i++) {
|
|
|
log.info("将yaml按比例均分给节点 {}", nodeNameAfter);
|
|
|
final String yamlPathCacheKeyBefore = yamlPathCacheKeySetGroupByNodeName.get(i);
|