root 2 роки тому
батько
коміт
6b6ad90f4b

+ 29 - 0
api-common/src/main/java/api/common/util/FileUtil.java

@@ -597,6 +597,35 @@ public class FileUtil {
         return result;
     }
 
+    /**
+     * 获取所有文件列表。
+     *
+     * @param rootPath 文件 File 对象。
+     */
+    public static List<String> listAbsolutePath(String rootPath) throws Exception {
+
+        List<String> result = new ArrayList<>();
+        File root = new File(rootPath);
+        //1 判断文件是否存在
+        if (!root.exists()) {
+            throw new Exception("文件" + root.getAbsolutePath() + "不存在!");
+        }
+        //2 判断是否是文件
+        if (root.isFile()) {
+            return new ArrayList<>(Collections.singletonList(rootPath));
+        }
+        //3 判断是否是目录
+        if (root.isDirectory()) {
+            File[] children = root.listFiles();
+            if (children != null) {
+                for (File child : children) {
+                    result.addAll(list(child.getAbsolutePath()));
+                }
+            }
+        }
+        return result;
+    }
+
     /**
      * 获取所有同一类型的文件列表。
      *

+ 3 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -218,10 +218,10 @@ public class ProjectConsumer {
         VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
         List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
         List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
-        // -------------------------------- 3 发送任务消息 --------------------------------
-        projectService.sendTaskMessage(realCurrentParallelism, projectRunningKey, userId, projectId, projectType, videoTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
-        // -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
+        // -------------------------------- 3 算法导入(一期按单机版做) --------------------------------
         String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
+        // -------------------------------- 4 发送任务消息 --------------------------------
+        projectService.sendTaskMessage(realCurrentParallelism, projectRunningKey, userId, projectId, projectType, nodeMap, algorithmDockerImage, videoTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
         projectService.createPod(projectId, nodeMap, algorithmDockerImage);
 

+ 14 - 6
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/ProjectManager.java

@@ -32,20 +32,28 @@ public class ProjectManager {
      * @param algorithmDockerImage
      */
     @SneakyThrows
-    public void createTempYaml(String projectId, String algorithmDockerImage, String nodeName) {
+    public void createTempYaml(String projectId,
+                               String algorithmDockerImage,
+                               String nodeName,
+                               int kafkaPartition,
+                               long kafkaOffset
+
+    ) {
         String podName = projectUtil.getRandomPodName(projectId);   // 生成 podName
-        String podTemplateFileNameOfProject = projectUtil.getPodYamlName(nodeName, podName);     // 模板文件名称
+        String podYaml = projectUtil.getPodYamlName(nodeName, podName);     // 模板文件名称
         String podString = FileUtil.read(new File(podTemplateYaml));
         String replace0 = podString.replace("vtd-container", "vtd-" + projectId);
         String replace1 = replace0.replace("vtd-image", kubernetesConfiguration.getVtdImage());
         String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
         String replace3 = replace2.replace("algorithm-image", algorithmDockerImage);
         String replace4 = replace3.replace("kafkaTopic", projectId);     // 消息主题名称为 projectId
-        String replace5 = replace4.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
-        String replace6 = replace5.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
-        String replace7 = replace6.replace("node-name", nodeName);     // 指定 pod 运行节点
+        String replace5 = replace4.replace("kafkaPartition", kafkaPartition + "");     // 消息主题名称为 projectId
+        String replace6 = replace5.replace("kafkaOffset", kafkaOffset + "");     // 消息主题名称为 projectId
+        String replace7 = replace6.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
+        String replace8 = replace7.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
+        String replace9 = replace8.replace("node-name", nodeName);     // 指定 pod 运行节点
 //        log.info("ProjectService--createPod 在节点 " + nodeName + " 开始执行 pod:" + tempPodString);
-        FileUtil.writeStringToLocalFile(replace7, podYamlDirectory + podTemplateFileNameOfProject);
+        FileUtil.writeStringToLocalFile(replace9, podYamlDirectory + podYaml);
 //        log.info("ProjectService--createPod 在节点 " + nodeName + " 开始执行 pod。");
 //        projectUtil.createPod(nodeName, podName, tempPodString);
     }

+ 15 - 13
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -142,20 +142,21 @@ public class TaskManager {
                 ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey()), ProjectMessageDTO.class);
                 int taskTotal = projectMessageDTO.getTaskTotal();
                 int taskCompleted = projectMessageDTO.getTaskCompleted();
-                Integer currentParallelism = projectMessageDTO.getCurrentParallelism();
                 log.info("TaskManager--isProjectCompleted 项目 " + projectId + " 完成进度为:" + (taskCompleted + 1) + "/" + taskTotal);
                 if (taskCompleted + 1 == taskTotal) {
                     return true;
                 } else {    // 项目没有完成
                     projectMessageDTO.setTaskCompleted(taskCompleted + 1);  // 增加已完成任务数
                     stringRedisTemplate.opsForValue().set(redisPrefix.getProjectRunningKey(), JsonUtil.beanToJson(projectMessageDTO));
-                    if (taskTotal - taskCompleted <= currentParallelism) {
-                        // 如果 taskTotal - taskCompleted 小于 currentParallelism ,则不启动下一个 pod,删除当前 pod 并归还一个并行度
-                        projectUtil.deletePod(podName);
-                        projectUtil.addOneParallelismToNode(nodeName);
-                    } else {
-                        projectUtil.createNextPod(projectId, nodeName, podName);  // 删除上一个 pod,并启动下一个 pod
-                    }
+//                    int currentParallelism = projectMessageDTO.getCurrentParallelism();
+//                    if (taskTotal - taskCompleted <= currentParallelism) {
+//                        // 如果 taskTotal - taskCompleted 小于 currentParallelism ,则不启动下一个 pod,删除当前 pod 并归还一个并行度
+//                        projectUtil.deletePod(podName);
+//                        projectUtil.addOneParallelismToNode(nodeName);
+//                    } else {
+//                        projectUtil.createNextPod(projectId, nodeName, podName);  // 删除上一个 pod,并启动下一个 pod
+//                    }
+                    projectUtil.createNextPod2(projectId, nodeName, podName);
                     return false;
                 }
             } catch (Exception exception) {
@@ -212,14 +213,16 @@ public class TaskManager {
             log.info("TaskService--state 开始执行对第 " + (i + 1) + " 个叶子节点 " + indexId + " 进行打分!");
 
             String ruleFilePath = pyPath + "scripts/" + ruleName.split("_")[0] + "/" + ruleName + ".py";
-            log.info("TaskService--state 将叶子节点 " + indexId + " 对应的打分规则保存到临时目录:" + ruleFilePath);
             FileUtil.writeInputStreamToLocalFile(IoUtil.stringToInputStream(ruleDetails), ruleFilePath);
+            log.info("TaskService--state 将叶子节点 " + indexId + " 对应的打分规则保存到临时目录:" + ruleFilePath);
             List<TaskPO> taskListOfLeafIndex = taskList.stream()
                     .filter(task -> indexId.equals(task.getLastTargetId()))
                     .collect(Collectors.toList());
-            log.info("TaskService--state 叶子节点 " + indexId + " 包括 " + taskListOfLeafIndex.size() + " 个任务!");
+            log.info("TaskService--state 叶子节点 " + indexId + " 包括 " + taskListOfLeafIndex.size() + " 个任务:" + taskListOfLeafIndex);
             // 计算叶子指标的得分
-            taskListOfLeafIndex.forEach(taskOfLeaf -> {
+            // 使用 stream 流会出现无法进入循环的情况
+            // taskListOfLeafIndex.forEach(taskOfLeaf -> {});
+            for (TaskPO taskOfLeaf : taskListOfLeafIndex) {
                 String task2Id = taskOfLeaf.getId();
                 try {
                     String runState = taskOfLeaf.getRunState();
@@ -273,8 +276,7 @@ public class TaskManager {
                     log.error("TaskManager--score 任务 " + task2Id + " 打分出错!", e.getMessage());
                     // 如果打分失败则开始下一个打分
                 }
-            });
-
+            }
 
             // 计算不合格的任务数(不到100分就是不合格,执行失败的不算)
             // 计算叶子指标下任务得分总和

+ 26 - 8
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -6,6 +6,7 @@ import api.common.util.*;
 import com.css.simulation.resource.scheduler.configuration.docker.DockerConfiguration;
 import com.css.simulation.resource.scheduler.configuration.git.GitConfiguration;
 import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
+import com.css.simulation.resource.scheduler.manager.ProjectManager;
 import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.pojo.to.*;
@@ -102,6 +103,8 @@ public class ProjectService {
     KubernetesConfiguration kubernetesConfiguration;
     @Resource
     GitConfiguration gitConfiguration;
+    @Resource
+    ProjectManager projectManager;
 
 
     // -------------------------------- Comment --------------------------------
@@ -173,6 +176,9 @@ public class ProjectService {
      * @param projectRunningPrefix
      * @param userId
      * @param projectId
+     * @param projectType
+     * @param nodeMap              节点
+     * @param algorithmDockerImage 算法
      * @param videoTime            视频长度
      * @param scenePOSet
      * @param vehiclePO
@@ -180,8 +186,19 @@ public class ProjectService {
      * @param ogtPOList
      */
     @SneakyThrows
-    public void sendTaskMessage(int parallelism, String projectRunningPrefix, String userId, String projectId, String projectType, Long videoTime, Set<ScenePO> scenePOSet, VehiclePO vehiclePO, List<CameraPO> cameraPOList, List<OgtPO> ogtPOList) {
-
+    public void sendTaskMessage(int parallelism,
+                                String projectRunningPrefix,
+                                String userId,
+                                String projectId,
+                                String projectType,
+                                Map<String, Integer> nodeMap,
+                                String algorithmDockerImage,
+                                Long videoTime,
+                                Set<ScenePO> scenePOSet,
+                                VehiclePO vehiclePO,
+                                List<CameraPO> cameraPOList,
+                                List<OgtPO> ogtPOList) {
+        Map<String, Integer> nodeMapToCount = projectUtil.getNodeMapToCount(nodeMap);
         final int[] messageNumber = CollectionUtil.createIntArray(0);
         ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, parallelism, (short) 1);   // 创建主题
         log.info("ProjectService--sendTaskMessage 项目 " + projectId + " 获得的包括的场景信息为:" + scenePOSet);
@@ -227,14 +244,14 @@ public class ProjectService {
                 String osgbName = splitOsgb[splitOsgb.length - 1];
                 try {
                     String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
-                    String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId+".xodr";
+                    String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + ".xodr";
                     String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
-                    String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId+".osgb";
+                    String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + ".osgb";
                     MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
-                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
                     MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
-                    MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
                     log.info("ProjectService--sendTaskMessage 已经将 xodr 上传到 minio 的结果文件目录:" + xodrPathOfMinio);
+                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
+                    MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
                     log.info("ProjectService--sendTaskMessage 已经将 osgb 上传到 minio 的结果文件目录:" + osgbPathOfMinio);
                 } catch (IOException | ServerException | InsufficientDataException | ErrorResponseException |
                          NoSuchAlgorithmException | InvalidKeyException | InvalidResponseException |
@@ -305,8 +322,9 @@ public class ProjectService {
                     long offset = success.getRecordMetadata().offset();
                     log.info("------- ProjectConsumer 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset + " 消息体为:" + finalTaskJson);
                     //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
-
-
+                    // 选一个count最少的node
+                    String leastCountNodeName = projectUtil.getLeastCountNodeOfNodeMapToCount(nodeMapToCount);
+                    projectManager.createTempYaml(projectId, algorithmDockerImage, leastCountNodeName, partition, offset);
 
 
                 }, failure -> log.error("------- 发送消息失败:" + failure.getMessage()));

+ 64 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -23,6 +23,7 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -59,6 +60,29 @@ public class ProjectUtil {
     @Resource
     ApiClient apiClient;
 
+
+    public synchronized String getLeastCountNodeOfNodeMapToCount(Map<String, Integer> nodeMapToCount) {
+        AtomicReference<String> result = new AtomicReference<>();
+        result.set("");
+        int tempCount = 1;
+        nodeMapToCount.forEach((nodeName, count) -> {
+            if (count < tempCount) {
+                result.set(nodeName);
+            }
+        });
+        return result.get();
+    }
+
+    public Map<String, Integer> getNodeMapToCount(Map<String, Integer> nodeMap) {
+        Map<String, Integer> result = new HashMap<>();
+        nodeMap.forEach((nodeName, parallelism) -> {
+            for (int i = 0; i < parallelism; i++) {
+                result.put(nodeName, 0);
+            }
+        });
+        return result;
+    }
+
     /**
      * 判断算法是否已经导入
      *
@@ -127,10 +151,41 @@ public class ProjectUtil {
         String lastPodString = FileUtil.read(podYamlDirectory + getPodYamlName(nodeName, lastPodName));
         String nextPodName = getRandomPodName(projectId);
         String nextPodString = lastPodString.replace(lastPodName, nextPodName); // pod 名称包括 projectId 和 随机字符串
-        log.info("TaskManager--createNextPod 创建项目 " + projectId + " 的下一个 pod。");
+        log.info("ProjectUtil--createNextPod 创建项目 " + projectId + " 的下一个 pod。");
         createPod(nodeName, nextPodName, nextPodString);
     }
 
+    /**
+     * 更改一个名字继续启动
+     *
+     * @param projectId   项目 id
+     * @param nodeName    运行 pod 的节点名称
+     * @param lastPodName 即将删除的 pod 名称
+     */
+    @SneakyThrows
+    public void createNextPod2(String projectId, String nodeName, String lastPodName) {
+        deletePod(lastPodName);
+
+        //1 删除上一个 pod 和 redis 键值对 和 旧的 yaml 文件
+        deletePod(lastPodName);
+        List<String> list1 = FileUtil.listAbsolutePath(podYamlDirectory);
+        list1.forEach(absolutePath -> {
+            if (absolutePath.contains(nodeName) && absolutePath.contains(lastPodName)) {
+                FileUtil.rm(absolutePath);
+            }
+        });
+        List<String> list2 = FileUtil.listAbsolutePath(podYamlDirectory);
+        for (String absolutePath : list2) {
+            if (absolutePath.contains(nodeName) && absolutePath.contains(lastPodName)) {
+                createPod2(absolutePath);
+                log.info("ProjectUtil--createNextPod 创建项目 " + projectId + " 的下一个 pod。");
+                return;
+            }
+        }
+        // 如果当前节点没有下一个yaml,则返回一个并行度。
+        addOneParallelismToNode(nodeName);
+    }
+
 
     /**
      * @param nodeName       节点名称
@@ -150,6 +205,14 @@ public class ProjectUtil {
 //        KubernetesUtil.createPod(apiClient, kubernetesNamespace, v1Pod);
     }
 
+    /**
+     * @param podYamlPath pod 文件内容
+     */
+    @SneakyThrows
+    public void createPod2(String podYamlPath) {
+        KubernetesUtil.applyYaml(hostname, username, password, podYamlPath);
+    }
+
 
     public String getProjectTypeByProjectId(String projectId) {
         String projectType = null;