root 2 жил өмнө
parent
commit
7c80be9273

+ 51 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/util/ProjectUtil.java

@@ -111,7 +111,9 @@ public class ProjectUtil {
             RedisUtil.deleteByKey(stringRedisTemplate, "pod:" + podName + ":node");
             RedisUtil.deleteByKey(stringRedisTemplate, "pod:" + podName + ":cpu");
             KubernetesUtil.deletePod(apiClient, kubernetesConfiguration.getNamespace(), podName);
-            TimeUnit.SECONDS.sleep(10); // 暂停 10 秒等待资源完全释放。
+
+            log.info("deletePod() 等待 pod " + podName + " 的资源释放完成。");
+            TimeUnit.SECONDS.sleep(10);
             log.info("deletePod() 删除 pod 并删除 redis 键值对:" + key);
         } catch (Exception e) {
             e.printStackTrace();
@@ -164,6 +166,38 @@ public class ProjectUtil {
         addOneParallelismToNode(nodeName);
     }
 
+    /**
+     * 更改一个名字继续启动
+     *
+     * @param projectId   项目 id
+     * @param nodeName    运行 pod 的节点名称
+     * @param lastPodName 即将删除的 pod 名称
+     */
+    @SneakyThrows
+    public void createNextPod3(String projectId, String nodeName, String lastPodName) {
+        //1 删除上一个 pod 和 redis 键值对 和 旧的 yaml 文件
+        deletePod(lastPodName);
+        //2 获取新的 yaml 信息
+        final Set<String> yamlPathCacheKeySet = RedisUtil.getKeySetByPrefix(stringRedisTemplate, "project:" + projectId + ":node:" + nodeName + ":yaml");
+        if (CollectionUtil.isEmpty(yamlPathCacheKeySet)) {
+            // 如果当前节点没有下一个yaml,则返回一个并行度。
+            log.info("createNextPod3() 节点 " + nodeName + " 已经执行完被分配的项目 " + projectId + " 的所有 pod。");
+            addOneParallelismToNode(nodeName);
+        } else {
+            final String yamlPathCacheKey = new ArrayList<>(yamlPathCacheKeySet).get(0);
+            final String absolutePath = stringRedisTemplate.opsForValue().get(yamlPathCacheKey);
+            // 修改 cpu 编号
+            String cpuOrderString = stringRedisTemplate.opsForValue().get("pod:" + lastPodName + ":cpu");
+            Optional.ofNullable(cpuOrderString).orElseThrow(() -> new RuntimeException("createNextPod2() pod " + lastPodName + " 缓存的 cpu 编号为空。"));
+            final String read = FileUtil.read(absolutePath);
+            final String replace = read.replace("cpu-order", "\"" + cpuOrderString + "\"");
+            FileUtil.writeStringToLocalFile(replace, absolutePath);
+            // 创建 pod
+            createPod3(yamlPathCacheKey);
+            log.info("createNextPod3() 创建项目 " + projectId + " 在节点 " + nodeName + " 的下一个 pod。");
+        }
+    }
+
 
     /**
      * @param nodeName       节点名称
@@ -184,7 +218,7 @@ public class ProjectUtil {
     }
 
     /**
-     * @param podYamlPath pod 文件内容
+     * @param podYamlPath yaml 文件地址
      */
     public void createPod2(String podYamlPath) {
         String nodeName = new File(podYamlPath).getName().split("#")[0];
@@ -193,6 +227,21 @@ public class ProjectUtil {
         new Thread(() -> KubernetesUtil.applyYaml(hostname, username, password, podYamlPath), "create-" + podName).start();
     }
 
+    /**
+     * @param redisKey yaml 地址的缓存 key
+     */
+    public void createPod3(String redisKey) {
+        final String podYamlPath = stringRedisTemplate.opsForValue().get(redisKey);
+        if (podYamlPath == null) {
+            throw new RuntimeException("createPod3() 根据缓存 key 获取 yaml 地址为 null:" + redisKey);
+        }
+        stringRedisTemplate.delete(redisKey);
+        String nodeName = new File(podYamlPath).getName().split("#")[0];
+        String podName = podYamlPath.split("#")[1].split("\\.")[0];
+        stringRedisTemplate.opsForValue().set("pod:" + podName + ":node", nodeName);    // 将 pod 运行在哪个 node 上记录到 redis
+        new Thread(() -> KubernetesUtil.applyYaml(hostname, username, password, podYamlPath), "create-" + podName).start();
+    }
+
 
     public String getProjectTypeByProjectId(String projectId) {
         String projectType = null;

+ 3 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/entity/AlgorithmPO.java

@@ -1,5 +1,6 @@
 package com.css.simulation.resource.scheduler.dao.entity;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -8,6 +9,8 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 @AllArgsConstructor
 public class AlgorithmPO {
+
+    @JsonFormat
     private String id;
     private String algorithmCode;
     private String minioPath;

+ 8 - 8
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/manager/ProjectManager.java

@@ -59,6 +59,9 @@ public class ProjectManager {
     ) {
         String podName = projectUtil.getRandomPodName(projectId);   // 生成 podName
         String podYaml = projectUtil.getPodYamlName(nodeName, podName);     // 模板文件名称
+        String yamlPath = podYamlDirectory + podYaml;
+        String finalYaml;
+
         if ("1".equals(modelType)) {
             String podString = FileUtil.read(new File(vtdPodTemplateYaml));
             String replace0 = podString.replace("vtd-container", "vtd-" + projectId);
@@ -86,7 +89,6 @@ public class ProjectManager {
                 replace14 = replace13;
             }
 
-            String finalYaml;
             if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
                 log.info("项目 " + projectId + " 使用 gpu 生成视频");
                 String replace15 = replace14.replace("vtd-image", kubernetesConfiguration.getImageVtdGpu());
@@ -98,9 +100,6 @@ public class ProjectManager {
             } else {
                 throw new RuntimeException("createTempYaml() 是否使用 gpu:" + isChoiceGpu);
             }
-            log.info("保存项目 " + projectId + " 的 yaml 文件:" + podYamlDirectory + podYaml);
-            FileUtil.writeStringToLocalFile(finalYaml, podYamlDirectory + podYaml);
-            return podYamlDirectory + podYaml;
         } else if ("2".equals(modelType)) {
             String podString = FileUtil.read(new File(carsimPodTemplateYaml));
             String replace0 = podString.replace("vtd-container", "vtd-" + projectId);
@@ -134,7 +133,6 @@ public class ProjectManager {
                 replace19 = replace18;
             }
 
-            String finalYaml;
             if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
                 log.info("项目 " + projectId + " 使用 gpu 生成视频");
                 log.info("createTempYaml() k8s参数为:" + kubernetesConfiguration);
@@ -148,11 +146,13 @@ public class ProjectManager {
             } else {
                 throw new RuntimeException("createTempYaml() 是否使用 gpu:" + isChoiceGpu);
             }
-            log.info("保存项目 " + projectId + " 的 yaml 文件:" + podYamlDirectory + podYaml);
-            FileUtil.writeStringToLocalFile(finalYaml, podYamlDirectory + podYaml);
-            return podYamlDirectory + podYaml;
         } else {
             throw new RuntimeException("createTempYaml() 模型类型错误:" + modelType);
         }
+        log.info("保存项目 " + projectId + " 的 yaml 文件:" + yamlPath);
+        FileUtil.writeStringToLocalFile(finalYaml, yamlPath);
+        String yamlRedisKey = "project:" + projectId + ":node:" + nodeName + ":yaml:" + podName;
+        stringRedisTemplate.opsForValue().set(yamlRedisKey, yamlPath);
+        return yamlRedisKey;
     }
 }

+ 5 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/manager/TaskManager.java

@@ -150,7 +150,7 @@ public class TaskManager {
                     taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
                     // 查询项目是否使用 gpu 生成视频(0是1否)
                     String isChoiceGpu = projectUtil.getProjectByProjectId(projectId).getIsChoiceGpu();
-                    log.info("项目 " + projectId + " 是否需要生成 gpu 视频( 0是 1否 ):" + isChoiceGpu);
+                    log.info("isCompleted() 项目 " + projectId + " 是否需要生成 gpu 视频( 0是 1否 ):" + isChoiceGpu);
                     if ("1".equals(isChoiceGpu)) {
                         FutureTask<ResponseBodyVO<String>> videoTask = new FutureTask<>(() -> videoService.generateVideo(projectId, projectType, maxSimulationTime, taskId));
                         new Thread(videoTask, "video-" + StringUtil.getRandomEightBitUUID()).start();
@@ -159,10 +159,11 @@ public class TaskManager {
                 // -------------------------------- 判断项目是否结束 --------------------------------
                 result = taskLock.complete(redisPrefix, projectId, nodeName, podName);
                 if (!result) {
-                    projectUtil.createNextPod2(projectId, nodeName, podName);
+                    log.info("isCompleted() 项目 " + projectId + " 还未运行完成。");
+                    projectUtil.createNextPod3(projectId, nodeName, podName);
                 }
             } catch (Exception exception) {
-                log.info("TaskManager--isCompleted 报错。", exception);
+                log.info("isCompleted() 报错。", exception);
             }
         }
         return result;
@@ -466,7 +467,7 @@ public class TaskManager {
     }
 
     public void taskTick(String taskId) {
-        log.info("taskTick() 任务 " + taskId + "心跳!");
+        log.info("taskTick() 任务 " + taskId + " 心跳。");
         TaskPO taskPO = taskMapper.selectById(taskId);
         String projectId = taskPO.getPId();
         String userId = taskPO.getCreateUserId();

+ 7 - 10
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -298,18 +298,16 @@ public class ProjectService {
         PrefixTO redisPrefix = projectUtil.getRedisPrefixByProjectIdAndProjectType(projectId, projectType);
         final String projectRunningKeyPrefix = redisPrefix.getProjectRunningKey();
         final Set<String> projectRunningKeySet = RedisUtil.getKeySetByPrefix(stringRedisTemplate, projectRunningKeyPrefix);
-        boolean isRunning = true;
-        if (CollectionUtil.isEmpty(projectRunningKeySet)) {
-            isRunning = false;
-        }
+        boolean isRunning = !CollectionUtil.isEmpty(projectRunningKeySet);
 
 
         //2 删除 kafka 消息
         ApacheKafkaUtil.deleteTopic(kafkaAdminClient, projectId);
         //3 删除项目所有任务
         taskMapper.deleteByProject(projectId);
+
+        //4 根据 pod 前缀删除所有 pod
         if (isRunning){
-            //2 根据 pod 前缀删除所有 pod
             String podPrefix = "project-" + projectId;
             Set<String> nodeOfPodKeySet = RedisUtil.getKeySetByPrefix(stringRedisTemplate, "pod:" + podPrefix);
             for (String nodeOfPodKey : nodeOfPodKeySet) {
@@ -322,21 +320,20 @@ public class ProjectService {
             }
         }
 
-        //3 其他 redis key
+        //5 其他 redis key
         RedisUtil.deleteByPrefix(stringRedisTemplate, "project:" + projectId + ":package:");
         RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getProjectRunningKey());
         RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getProjectWaitingKey());
 
-        //4 删除项目 pod 启动文件
+        //6 删除项目 pod 启动文件
         FileUtil.deleteFileBySubstring(podYamlDirectory, projectId);
 
-        //5 删除项目临时文件
+        //7 删除项目临时文件
         FileUtil.rm(linuxTempPath + "project/" + projectId + "/");
 
-        //6 删除 minio 残留文件
+        //8 删除 minio 残留文件
         MinioUtil.rmR(minioClient, bucketName, projectResultPathOfMinio + projectId + "/");
 
-
         log.info("stopProject() 项目 " + projectId + " 终止成功!");
     }
 

+ 11 - 13
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/consumer/ProjectConsumer.java

@@ -258,7 +258,7 @@ public class ProjectConsumer {
                 }
             }
             taskManager.batchInsertTask(taskList);
-            log.info("createTaskAndFixData() 项目 " + projectId + " 共有" + taskList.size() + "个任务,已保存到数据库");
+            log.info("createTaskAndFixData() 项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
         } else if ("2".equals(modelType)) {
             log.info("createTaskAndFixData() 项目 " + projectId + " 开始查询模型。");
 
@@ -310,7 +310,6 @@ public class ProjectConsumer {
                     String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
                     MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
                     MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
-                    log.info("cacheManualProject() 已经将 xosc 上传到 minio 的结果文件目录:" + xoscPathOfMinio);
 
                     String scenarioOdr = scenePO.getScenarioOdr();
                     String[] splitXodr = scenarioOdr.split("/");
@@ -321,7 +320,6 @@ public class ProjectConsumer {
                     String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
                     MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
                     MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
-                    log.info("cacheManualProject() 已经将 xodr 上传到 minio 的结果文件目录:" + xodrPathOfMinio);
 
                     String scenarioOsgb = scenePO.getScenarioOsgb();
                     String[] splitOsgb = scenarioOsgb.split("/");
@@ -332,7 +330,7 @@ public class ProjectConsumer {
                     String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
                     MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
                     MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
-                    log.info("cacheManualProject() 已经将 osgb 上传到 minio 的结果文件目录:" + osgbPathOfMinio);
+                    log.info("cacheManualProject() 已经将 xosc、xodr、osgb 上传到 minio 的结果文件目录:" + projectResultPathOfMinio);
 
                     // 组装 task 消息
                     // carsim 不需要查询模型参数
@@ -365,7 +363,7 @@ public class ProjectConsumer {
                 }
             }
             taskManager.batchInsertTask(taskList);
-            log.info("createTaskAndFixData() 项目 " + projectId + " 共有" + taskList.size() + "个任务,已保存到数据库");
+            log.info("createTaskAndFixData() 项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
         }
 
         //* -------------------------------- 4 开始排队 --------------------------------
@@ -522,7 +520,7 @@ public class ProjectConsumer {
         ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1);   // 创建主题
         TimeUnit.SECONDS.sleep(7);
         // 需要即时启动的任务(并行度的大小)
-        CopyOnWriteArrayList<String> yamlListToRun = new CopyOnWriteArrayList<>();
+        CopyOnWriteArrayList<String> yamlToRunRedisKeyList = new CopyOnWriteArrayList<>();
         for (String taskJsonPath : taskJsonList) {
             String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
             //TODO 设置任务重试次数为 0,方便任务进行最大3次的重试。
@@ -567,19 +565,19 @@ public class ProjectConsumer {
 
             log.info("parseProject() 项目 " + projectId + " 准备创建 yaml:是否使用 gpu " + isChoiceGpu + ",当前节点名称为:" + currentNodeName + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前 cpu 编号为:" + cpuOrder);
             // 只有准备启动(即 currentCount == 0)的时候才指定 cpu 编号
-            String tempYaml = projectManager.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, currentCount, cpuOrder);
+            String yamlRedisKey = projectManager.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, currentCount, cpuOrder);
             if (currentCount == 0) {
-                log.info("parseProject() 加入到启动列表 " + tempYaml);
-                yamlListToRun.add(tempYaml);
+                log.info("parseProject() 加入到启动列表 " + yamlRedisKey);
+                yamlToRunRedisKeyList.add(yamlRedisKey);
             }
             messageNumber++;
         }
         TimeUnit.SECONDS.sleep(6);
-        log.info("parseProject() 项目 " + projectId + " 共发送了 " + messageNumber + " 条消息。 准备首先启动 " + yamlListToRun);
-        for (String yaml : yamlListToRun) {
-            projectUtil.createPod2(yaml);
+        log.info("parseProject() 项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
+        for (String redisKey : yamlToRunRedisKeyList) {
+            projectUtil.createPod3(redisKey);
         }
-        log.info("parseProject() 项目 " + projectId + " 已经启动 " + yamlListToRun);
+        log.info("parseProject() 项目 " + projectId + " 已经启动 " + yamlToRunRedisKeyList);
     }