root %!s(int64=2) %!d(string=hai) anos
pai
achega
31b496ada0

+ 11 - 12
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -504,18 +504,14 @@ public class ProjectConsumer {
         projectMessageDTO.setTaskCompleted(0);
 
         // 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
-        Map<String, Integer> nodeMap;
-        if (currentParallelism < taskTotal) {
-            nodeMap = projectUtil.getNodeMapToUse(currentParallelism);
-        } else {
-            nodeMap = projectUtil.getNodeMapToUse(taskTotal);
-        }
-        // 将指定 node 的并行度减少
+        //1 获取剩余并行度和即将使用的各node的并行度
+        Map<String, Integer> nodeMap0 = projectUtil.getNodeMap();
+        Map<String, Integer> nodeMap = projectUtil.getNodeMapToUse(Math.min(currentParallelism, taskTotal));
+        //2 将指定 node 的并行度减少
         nodeMap.keySet().forEach(nodeName -> {
             int parallelismToUse = nodeMap.get(nodeName);
             String restParallelismKey = "node:" + nodeName + ":parallelism";
-            int restParallelism = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue()
-                    .get(restParallelismKey))); // 剩余可用并行度
+            int restParallelism = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(restParallelismKey))); // 剩余可用并行度
             stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism - parallelismToUse) + "");
         });
         // 重新设置实际使用的并行度并保存到 redis
@@ -530,7 +526,7 @@ public class ProjectConsumer {
         log.info("parseProject() 项目 " + projectId + " 运行在:" + nodeListToCount);
         int messageNumber = 0;
         ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1);   // 创建主题
-        TimeUnit.SECONDS.sleep(6);
+        TimeUnit.SECONDS.sleep(7);
         // 需要即时启动的任务(并行度的大小)
         CopyOnWriteArrayList<String> yamlListToRun = new CopyOnWriteArrayList<>();
         for (String taskJsonPath : taskJsonList) {
@@ -566,9 +562,12 @@ public class ProjectConsumer {
                 }
             }
             currentNodeTO.setCount(currentNodeTO.getCount() + 1);
+            // 获取 cpu 编号
+            int cpuOrder = nodeMap0.get(currentNodeName) - 1;
+            nodeMap0.put(currentNodeName, cpuOrder);
 
-            log.info("parseProject() 项目 " + projectId + " 准备创建 yaml:是否使用 gpu " + isChoiceGpu + ",当前节点名称为:" + currentNodeName + ",当前节点已创建 yaml 个数为:" + currentCount);
-            String tempYaml = projectManager.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu);
+            log.info("parseProject() 项目 " + projectId + " 准备创建 yaml:是否使用 gpu " + isChoiceGpu + ",当前节点名称为:" + currentNodeName + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前 cpu 编号为:" + cpuOrder);
+            String tempYaml = projectManager.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder);
             if (currentCount == 0) {
                 log.info("parseProject() 加入到启动列表 " + tempYaml);
                 yamlListToRun.add(tempYaml);

+ 14 - 9
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/ProjectManager.java

@@ -49,7 +49,8 @@ public class ProjectManager {
                                  String nodeName,
                                  int kafkaPartition,
                                  long kafkaOffset,
-                                 String isChoiceGpu
+                                 String isChoiceGpu,
+                                 int cpuOrder
 
     ) {
         String podName = projectUtil.getRandomPodName(projectId);   // 生成 podName
@@ -73,16 +74,18 @@ public class ProjectManager {
             String replace12 = replace11.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
             String replace13 = replace12.replace("node-name", nodeName);     // 指定 pod 运行节点
 
+            String replace14 = replace13.replace("cpu-order", nodeName);     // 指定 cpu 编号
+
             String finalYaml = null;
             if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
                 log.info("项目 " + projectId + " 使用 gpu 生成视频");
-                String replace14 = replace13.replace("vtd-image", kubernetesConfiguration.getImageVtdGpu());
-                finalYaml = replace14.replace("vtd-command", kubernetesConfiguration.getCommandVtdGpu());
+                String replace15 = replace14.replace("vtd-image", kubernetesConfiguration.getImageVtdGpu());
+                finalYaml = replace15.replace("vtd-command", kubernetesConfiguration.getCommandVtdGpu());
             }
             if (DictConstants.NOT_USE_GPU.equals(isChoiceGpu)) {
                 log.info("项目 " + projectId + " 不使用 gpu 生成视频");
-                String replace14 = replace13.replace("vtd-image", kubernetesConfiguration.getImageVtdNogpu());
-                finalYaml = replace14.replace("vtd-command", kubernetesConfiguration.getCommandVtdNogpu());
+                String replace15 = replace14.replace("vtd-image", kubernetesConfiguration.getImageVtdNogpu());
+                finalYaml = replace15.replace("vtd-command", kubernetesConfiguration.getCommandVtdNogpu());
             }
             log.info("保存项目 " + projectId + " 的 yaml 文件:" + podYamlDirectory + podYaml);
             FileUtil.writeStringToLocalFile(finalYaml, podYamlDirectory + podYaml);
@@ -112,17 +115,19 @@ public class ProjectManager {
             String replace17 = replace16.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
             String replace18 = replace17.replace("node-name", nodeName);     // 指定 pod 运行节点
 
+            String replace19 = replace18.replace("cpu-order", nodeName);     // 指定 cpu 编号
+
             String finalYaml;
             if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
                 log.info("项目 " + projectId + " 使用 gpu 生成视频");
                 log.info("createTempYaml() k8s参数为:" + kubernetesConfiguration);
                 log.info("createTempYaml() yaml模板为:" + replace12);
-                String replace19 = replace18.replace("vtd-image", kubernetesConfiguration.getImageVtdGpu());
-                finalYaml = replace19.replace("vtd-command", kubernetesConfiguration.getCommandVtdCarsimGpu());
+                String replace20 = replace19.replace("vtd-image", kubernetesConfiguration.getImageVtdGpu());
+                finalYaml = replace20.replace("vtd-command", kubernetesConfiguration.getCommandVtdCarsimGpu());
             } else if (DictConstants.NOT_USE_GPU.equals(isChoiceGpu)) {
                 log.info("项目 " + projectId + " 不使用 gpu 生成视频");
-                String replace19 = replace18.replace("vtd-image", kubernetesConfiguration.getImageVtdNogpu());
-                finalYaml = replace19.replace("vtd-command", kubernetesConfiguration.getCommandVtdCarsimNogpu());
+                String replace20 = replace19.replace("vtd-image", kubernetesConfiguration.getImageVtdNogpu());
+                finalYaml = replace20.replace("vtd-command", kubernetesConfiguration.getCommandVtdCarsimNogpu());
             } else {
                 throw new RuntimeException("createTempYaml() 是否使用 gpu:" + isChoiceGpu);
             }

+ 2 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -237,8 +237,7 @@ public class ProjectUtil {
 
 
     /**
-     * 根据并行度获取节点列表以及剩余可用并行度
-     * 根据剩余可用并行度降序排序
+     * 节点剩余可用并行度列表
      *
      * @return 节点映射(节点名,并行度)
      */
@@ -261,7 +260,7 @@ public class ProjectUtil {
             }
             resultNodeMap.put(nodeName, restParallelism);
         }
-        log.info("ProjectUtil--getNodeMapToUse 剩余并行度的节点列表为:" + resultNodeMap);
+        log.info("getNodeMap() 剩余并行度的节点列表为:" + resultNodeMap);
         return resultNodeMap;
     }