|
@@ -32,8 +32,6 @@ import javax.annotation.Resource;
|
|
import java.util.*;
|
|
import java.util.*;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
import java.util.concurrent.FutureTask;
|
|
import java.util.concurrent.FutureTask;
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
-import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
|
|
@Component
|
|
@Component
|
|
@Slf4j
|
|
@Slf4j
|
|
@@ -102,8 +100,6 @@ public class ProjectConsumer {
|
|
String initialProjectJson = projectRecord.value();
|
|
String initialProjectJson = projectRecord.value();
|
|
ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(initialProjectJson, ProjectMessageDTO.class);
|
|
ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(initialProjectJson, ProjectMessageDTO.class);
|
|
String projectId = projectMessageDTO.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
String projectId = projectMessageDTO.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
- ProjectPO projectPO = projectUtil.getProjectByProjectId(projectId);
|
|
|
|
- projectMessageDTO.setIsChoiceGpu(projectPO.getIsChoiceGpu());
|
|
|
|
FutureTask<Integer> createTaskAndFixDataFutureTask = new FutureTask<>(() -> {
|
|
FutureTask<Integer> createTaskAndFixDataFutureTask = new FutureTask<>(() -> {
|
|
createTaskAndFixData(projectRecord);
|
|
createTaskAndFixData(projectRecord);
|
|
return 1024;
|
|
return 1024;
|
|
@@ -387,9 +383,12 @@ public class ProjectConsumer {
|
|
public void parseProject(ProjectMessageDTO projectMessageDTO, String projectWaitingKey, String projectRunningKey,
|
|
public void parseProject(ProjectMessageDTO projectMessageDTO, String projectWaitingKey, String projectRunningKey,
|
|
String userId) {
|
|
String userId) {
|
|
String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
- String isChoiceGpu = projectMessageDTO.getIsChoiceGpu();
|
|
|
|
|
|
+ ProjectPO projectPO = projectUtil.getProjectByProjectId(projectId);
|
|
|
|
+ log.info("项目 " + projectId + " 信息为:" + projectPO);
|
|
|
|
+ String isChoiceGpu = projectPO.getIsChoiceGpu();
|
|
// 项目类型
|
|
// 项目类型
|
|
int currentParallelism = projectMessageDTO.getCurrentParallelism(); // 当前并行度
|
|
int currentParallelism = projectMessageDTO.getCurrentParallelism(); // 当前并行度
|
|
|
|
+
|
|
// 场景测试包 id
|
|
// 场景测试包 id
|
|
// 结果视频的时长
|
|
// 结果视频的时长
|
|
// 模型配置 id
|
|
// 模型配置 id
|
|
@@ -427,9 +426,10 @@ public class ProjectConsumer {
|
|
String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
|
|
String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
|
|
// -------------------------------- 4 发送任务消息 --------------------------------
|
|
// -------------------------------- 4 发送任务消息 --------------------------------
|
|
List<NodeTO> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
|
|
List<NodeTO> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
|
|
|
|
+ log.info("项目 " + projectId + " 运行在:" + nodeListToCount);
|
|
int messageNumber = 0;
|
|
int messageNumber = 0;
|
|
ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1); // 创建主题
|
|
ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1); // 创建主题
|
|
- Thread.sleep(5000);
|
|
|
|
|
|
+// Thread.sleep(5000);
|
|
// 需要即时启动的任务(并行度的大小)
|
|
// 需要即时启动的任务(并行度的大小)
|
|
CopyOnWriteArrayList<String> yamlListToRun = new CopyOnWriteArrayList<>();
|
|
CopyOnWriteArrayList<String> yamlListToRun = new CopyOnWriteArrayList<>();
|
|
for (String taskJsonPath : taskJsonList) {
|
|
for (String taskJsonPath : taskJsonList) {
|
|
@@ -452,27 +452,27 @@ public class ProjectConsumer {
|
|
+ partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
|
|
+ partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
|
|
//4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
|
|
//4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
|
|
// 选一个count 最少的 node
|
|
// 选一个count 最少的 node
|
|
- AtomicReference<String> currentNodeName = new AtomicReference<>("");
|
|
|
|
- AtomicInteger currentCount = new AtomicInteger(Integer.MAX_VALUE);
|
|
|
|
|
|
+ String currentNodeName = "";
|
|
|
|
+ int currentCount = Integer.MAX_VALUE;
|
|
for (NodeTO nodeTO : nodeListToCount) {
|
|
for (NodeTO nodeTO : nodeListToCount) {
|
|
int tempCount = nodeTO.getCount();
|
|
int tempCount = nodeTO.getCount();
|
|
String tempNodeName = nodeTO.getNodeName();
|
|
String tempNodeName = nodeTO.getNodeName();
|
|
- if (tempCount < currentCount.get()) {
|
|
|
|
- currentCount.set(tempCount);
|
|
|
|
- currentNodeName.set(tempNodeName);
|
|
|
|
|
|
+ if (tempCount < currentCount) {
|
|
|
|
+ currentCount = tempCount;
|
|
|
|
+ currentNodeName = tempNodeName;
|
|
nodeTO.setCount(tempCount + 1);
|
|
nodeTO.setCount(tempCount + 1);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- String currentNodeNameValue = currentNodeName.get();
|
|
|
|
- int currentCountValue = currentCount.get();
|
|
|
|
- String tempYaml = projectManager.createTempYaml(projectId, algorithmDockerImage
|
|
|
|
- , currentNodeNameValue, partition, offset,isChoiceGpu);
|
|
|
|
- if (currentCountValue == 0) {
|
|
|
|
|
|
+
|
|
|
|
+ log.info("项目 " + projectId + " 准备创建 yaml:是否使用 gpu " + isChoiceGpu + ",当前节点名称为:" + currentNodeName + ",当前节点已创建 yaml 个数为:" + currentCount);
|
|
|
|
+ String tempYaml = projectManager.createTempYaml(projectId, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu);
|
|
|
|
+ if (currentCount == 0) {
|
|
|
|
+ log.info("加入到启动列表 " + tempYaml);
|
|
yamlListToRun.add(tempYaml);
|
|
yamlListToRun.add(tempYaml);
|
|
}
|
|
}
|
|
messageNumber++;
|
|
messageNumber++;
|
|
}
|
|
}
|
|
- Thread.sleep(5000);
|
|
|
|
|
|
+ Thread.sleep(5);
|
|
log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息。");
|
|
log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息。");
|
|
log.info("项目 " + projectId + " 准备首先启动 " + yamlListToRun);
|
|
log.info("项目 " + projectId + " 准备首先启动 " + yamlListToRun);
|
|
for (String yaml : yamlListToRun) {
|
|
for (String yaml : yamlListToRun) {
|