root 2 gadi atpakaļ
vecāks
revīzija
b00c606927

+ 1 - 15
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -476,35 +476,21 @@ public class ProjectConsumer {
         String modelType = projectMessageDTO.getModelType();
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();
         ProjectEntity projectEntity = projectUtil.getProjectByProjectId(projectId);
-        log.info("项目 " + projectId + " 信息为:" + projectEntity);
         String isChoiceGpu = projectEntity.getIsChoiceGpu();
-        // 项目类型
         int currentParallelism = projectMessageDTO.getCurrentParallelism();   // 当前并行度
-
-        // 场景测试包 id
-        // 结果视频的时长
-        // 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
         String projectPath = linuxTempPath + "project/" + projectId + "/";
-        // -------------------------------- 0 准备 --------------------------------
-//        projectService.prepare(projectMessageDTO, projectWaitingKey, projectRunningKey);
         // -------------------------------- 1 获取任务 json 列表 --------------------------------
         List<String> taskJsonList = FileUtil.listAbsolutePathByTypeAndLength(projectPath, "json", 37);
         int taskTotal = taskJsonList.size();
         projectMessageDTO.setTaskTotal(taskTotal);
         projectMessageDTO.setTaskCompleted(0);
-
         // 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
         //1 获取剩余并行度和即将使用的各node的并行度
         Map<String, Integer> nodeMap0 = projectUtil.getNodeMap();
         Map<String, Integer> nodeMap = projectUtil.getNodeMapToUse(Math.min(currentParallelism, taskTotal));
         //2 将指定 node 的并行度减少
-        nodeMap.keySet().forEach(nodeName -> {
-            int parallelismToUse = nodeMap.get(nodeName);
-            String restParallelismKey = "gpu-node:" + nodeName + ":parallelism";
-            int restParallelism = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(restParallelismKey))); // 剩余可用并行度
-            stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism - parallelismToUse) + "");
-        });
+        nodeMap.keySet().forEach(nodeName -> projectUtil.decrementParallelismOfGpuNode(nodeName, nodeMap.get(nodeName)));
         // 重新设置实际使用的并行度并保存到 redis
         int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
         projectMessageDTO.setCurrentParallelism(realCurrentParallelism);

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -315,7 +315,7 @@ public class ProjectService {
                 // 删除 pod
                 projectUtil.deletePod(podName);
                 // 节点并行度加一
-                projectUtil.returnOneParallelismToGpuNode(nodeName);
+                projectUtil.incrementOneParallelismOfGpuNode(nodeName);
             }
         }
 

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskManager.java

@@ -158,7 +158,7 @@ public class TaskManager {
             } else {
                 //如果项目已完成先把 pod 删除,并归还并行度
                 KubernetesUtil.deletePod2(apiClient, kubernetesConfiguration.getNamespace(), podName);
-                projectUtil.returnOneParallelismToGpuNode(nodeName);
+                projectUtil.incrementOneParallelismOfGpuNode(nodeName);
             }
             RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getTaskMessageKey());
             RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getTaskPodKey());

+ 13 - 33
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -147,7 +147,7 @@ public class ProjectUtil {
         if (CollectionUtil.isEmpty(yamlPathCacheKeySet)) {
             // 如果当前节点没有下一个yaml,则返回一个并行度。
             log.info("createNextPod3() 节点 " + nodeName + " 已经执行完被分配的项目 " + projectId + " 的所有 pod。");
-            returnOneParallelismToGpuNode(nodeName);
+            incrementOneParallelismOfGpuNode(nodeName);
         } else {
             final String yamlPathCacheKey = new ArrayList<>(yamlPathCacheKeySet).get(0);
             final String absolutePath = stringRedisTemplate.opsForValue().get(yamlPathCacheKey);
@@ -470,41 +470,21 @@ public class ProjectUtil {
                 .build();
     }
 
-    /**
-     * 将 redis 中的变量 +1,需要保证同步
-     *
-     * @param nodeName 节点名称
-     */
-    @Synchronized
-    public void returnOneParallelismToGpuNode(String nodeName) {
-        String key = "gpu-node:" + nodeName + ":parallelism";
-        String parallelismString = stringRedisTemplate.opsForValue().get(key);
-        if (StringUtil.isEmpty(parallelismString)) {
-            throw new RuntimeException("redisKey " + key + " 为空。");
-        }
-        final int parallelismBefore = Integer.parseInt(parallelismString);
-        final int parallelismAfter = parallelismBefore + 1;
-        stringRedisTemplate.opsForValue().set(key, parallelismAfter + "");
-        log.info("归还节点 " + nodeName + " 的 GPU 并行度:" + parallelismBefore + " --> " + parallelismAfter);
+
+    public void incrementOneParallelismOfGpuNode(String nodeName) {
+        incrementParallelismOfGpuNode(nodeName, 1L);
     }
 
+    public void incrementParallelismOfGpuNode(String nodeName, long number) {
+        String key = "gpu-node:" + nodeName + ":parallelism";
+        customRedisClient.increment(key, 1L);
+        log.info("归还节点 {} 的 {} 个 GPU 并行度。", nodeName, number);
+    }
 
-    /**
-     * 将 redis 中的变量 +1,需要保证同步
-     *
-     * @param nodeName 节点名称
-     */
-    @Synchronized
-    public void returnOneParallelismToCpuNode(String nodeName) {
-        String key = "cpu-node:" + nodeName + ":parallelism";
-        String parallelismString = stringRedisTemplate.opsForValue().get(key);
-        if (StringUtil.isEmpty(parallelismString)) {
-            throw new RuntimeException("redisKey " + key + " 为空。");
-        }
-        final int parallelismBefore = Integer.parseInt(parallelismString);
-        final int parallelismAfter = parallelismBefore + 1;
-        stringRedisTemplate.opsForValue().set(key, parallelismAfter + "");
-        log.info("归还节点 " + nodeName + " 的 CPU 并行度:" + parallelismBefore + " --> " + parallelismAfter);
+    public void decrementParallelismOfGpuNode(String nodeName, long number) {
+        String key = "gpu-node:" + nodeName + ":parallelism";
+        customRedisClient.decrement(key, number);
+        log.info("获取节点 {} 的 {} 个 GPU 并行度。", nodeName, number);
     }
 
 

+ 18 - 4
simulation-resource-video/src/main/java/com/css/simulation/resource/video/configuration/redis/CustomRedisClient.java

@@ -1,6 +1,5 @@
 package com.css.simulation.resource.video.configuration.redis;
 
-import api.common.util.CollectionUtil;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.core.StringRedisTemplate;
@@ -18,7 +17,7 @@ public class CustomRedisClient {
     @Resource
     private StringRedisTemplate stringRedisTemplate;
 
-    //* -------------------------------- Comment --------------------------------
+    //* -------------------------------- 基本操作 --------------------------------
 
     public Set<String> keys(String pattern) {
         return stringRedisTemplate.keys(pattern);
@@ -34,7 +33,7 @@ public class CustomRedisClient {
 
     public void flushdb() {
         Set<String> keys = stringRedisTemplate.keys("*");
-        if (CollectionUtil.isNotEmpty(keys)) {
+        if (keys != null && keys.size() != 0) {
             stringRedisTemplate.delete(keys);
         }
     }
@@ -43,6 +42,21 @@ public class CustomRedisClient {
         stringRedisTemplate.delete(key);
     }
 
+    /**
+     * 加值
+     */
+    public void increment(String key, long number) {
+        stringRedisTemplate.opsForValue().increment(key, number);
+    }
+
+    /**
+     * 减值
+     */
+    public void decrement(String key, long number) {
+        stringRedisTemplate.opsForValue().decrement(key, number);
+    }
+
+
     //* -------------------------------- 分布式锁 --------------------------------
 
     /**
@@ -71,6 +85,6 @@ public class CustomRedisClient {
      * 解锁
      */
     public void unlock(String name) {
-        stringRedisTemplate.delete(name);
+        delete(name);
     }
 }

+ 58 - 54
simulation-resource-video/src/main/java/com/css/simulation/resource/video/service/VideoService.java

@@ -92,67 +92,71 @@ public class VideoService {
             parallelism = Integer.parseInt(customRedisClient.get(cpuNodeParallelismKey));
             TimeUnit.SECONDS.sleep(7);
         }
-        customRedisClient.set(cpuNodeParallelismKey, (parallelism - 1) + "");
+        String lockName = "video-lock-" + taskId;
+        customRedisClient.lock(lockName, Long.parseLong(maxSimulationTime) + 10L, 1000L);
+        customRedisClient.decrement(cpuNodeParallelismKey, 1);
+        try {
+            String rootDirectoryPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/";
+            String xodrPathOfMinio = rootDirectoryPathOfMinio + taskId + ".xodr";
+            String osgbPathOfMinio = rootDirectoryPathOfMinio + taskId + ".osgb";
+            String csv1PathOfMinio = rootDirectoryPathOfMinio + csv1Name;
+            String csv2PathOfMinio = rootDirectoryPathOfMinio + csv2Name;
+            String rootDirectoryPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/";
+            String xodrPathOfLinux = rootDirectoryPathOfLinux + taskId + ".xodr";
+            String osgbPathOfLinux = rootDirectoryPathOfLinux + taskId + ".osgb";
+            String csv1PathOfLinux = rootDirectoryPathOfLinux + csv1Name;
+            String csv2PathOfLinux = rootDirectoryPathOfLinux + csv2Name;
+            log.info("下载 csv、xodr、osgb。");
+            MinioUtil.downloadToFile(minioClient, bucketName, xodrPathOfMinio, xodrPathOfLinux);
+            MinioUtil.downloadToFile(minioClient, bucketName, osgbPathOfMinio, osgbPathOfLinux);
+            MinioUtil.downloadToFile(minioClient, bucketName, csv1PathOfMinio, csv1PathOfLinux);
+            MinioUtil.downloadToFile(minioClient, bucketName, csv2PathOfMinio, csv2PathOfLinux);
 
+            log.info("生成 xosc 文件。");
+            String xoscPath = generateXosc(rootDirectoryPathOfLinux, xodrPathOfLinux, osgbPathOfLinux, projectId, projectType);
+            log.info("启动虚拟窗口并通过 esmini 截图。");
+            String pictureDirectoryPath = rootDirectoryPathOfLinux + "picture";
+            FileUtil.createDirectory(pictureDirectoryPath);
+            String esminiCommandTemp = esminiCommand + " " + xoscPath + " " + pictureDirectoryPath + "/screenshot " + StringUtil.doubleToString(Double.parseDouble(maxSimulationTime), 2);
+            LinuxUtil.execute(XvfbCommand);
+            LinuxUtil.execute(esminiCommandTemp);
+            log.info("删除 esmini 进程。");
+            LinuxUtil.kill(esminiCommandTemp);
+            int num = 14;
+            for (int i = 0; i < num; i++) {
+                String remove = "rm -f " + pictureDirectoryPath + "/screenshot_0000" + i + ".tga";
+                log.info("删除图片:" + remove);
+                LinuxUtil.execute(remove);
+            }
+            log.info("生成视频。");
+            String videoTargetPathOfLinux = rootDirectoryPathOfLinux + "video/" + videoName;
+            FileUtil.createParentDirectory(videoTargetPathOfLinux);
+            String videoTargetPathOfMinio = rootDirectoryPathOfMinio + videoName;
 
-        String rootDirectoryPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/";
-        String xodrPathOfMinio = rootDirectoryPathOfMinio + taskId + ".xodr";
-        String osgbPathOfMinio = rootDirectoryPathOfMinio + taskId + ".osgb";
-        String csv1PathOfMinio = rootDirectoryPathOfMinio + csv1Name;
-        String csv2PathOfMinio = rootDirectoryPathOfMinio + csv2Name;
-        String rootDirectoryPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/";
-        String xodrPathOfLinux = rootDirectoryPathOfLinux + taskId + ".xodr";
-        String osgbPathOfLinux = rootDirectoryPathOfLinux + taskId + ".osgb";
-        String csv1PathOfLinux = rootDirectoryPathOfLinux + csv1Name;
-        String csv2PathOfLinux = rootDirectoryPathOfLinux + csv2Name;
-        log.info("下载 csv、xodr、osgb。");
-        MinioUtil.downloadToFile(minioClient, bucketName, xodrPathOfMinio, xodrPathOfLinux);
-        MinioUtil.downloadToFile(minioClient, bucketName, osgbPathOfMinio, osgbPathOfLinux);
-        MinioUtil.downloadToFile(minioClient, bucketName, csv1PathOfMinio, csv1PathOfLinux);
-        MinioUtil.downloadToFile(minioClient, bucketName, csv2PathOfMinio, csv2PathOfLinux);
-
-        log.info("生成 xosc 文件。");
-        String xoscPath = generateXosc(rootDirectoryPathOfLinux, xodrPathOfLinux, osgbPathOfLinux, projectId, projectType);
-        log.info("启动虚拟窗口并通过 esmini 截图。");
-        String pictureDirectoryPath = rootDirectoryPathOfLinux + "picture";
-        FileUtil.createDirectory(pictureDirectoryPath);
-        String esminiCommandTemp = esminiCommand + " " + xoscPath + " " + pictureDirectoryPath + "/screenshot " + StringUtil.doubleToString(Double.parseDouble(maxSimulationTime), 2);
-        LinuxUtil.execute(XvfbCommand);
-        LinuxUtil.execute(esminiCommandTemp);
-        log.info("删除 esmini 进程。");
-        LinuxUtil.kill(esminiCommandTemp);
-        int num = 14;
-        for (int i = 0; i < num; i++) {
-            String remove = "rm -f " + pictureDirectoryPath + "/screenshot_0000" + i + ".tga";
-            log.info("删除图片:" + remove);
-            LinuxUtil.execute(remove);
-        }
-        log.info("生成视频。");
-        String videoTargetPathOfLinux = rootDirectoryPathOfLinux + "video/" + videoName;
-        FileUtil.createParentDirectory(videoTargetPathOfLinux);
-        String videoTargetPathOfMinio = rootDirectoryPathOfMinio + videoName;
-
-        String execute = LinuxUtil.execute("ffmpeg"
-                + " -f image2 -framerate 30 "
-                + "-start_number " + num
-                + " -i " + pictureDirectoryPath + "/screenshot_%05d.tga"
-                + " -c:v libx264 -vf format=yuv420p -crf 20 "
-                + videoTargetPathOfLinux
-        );
-        //6 将视频上传到 minio
-        MinioUtil.uploadFromFile(minioClient, videoTargetPathOfLinux, bucketName, videoTargetPathOfMinio);
-        log.info("上传成功:" + videoTargetPathOfMinio);
-        //删除生成的临时文件
-        String removeAll = "rm -rf " + pictureDirectoryPath;
-        log.info("删除全部图片==" + removeAll);
+            String execute = LinuxUtil.execute("ffmpeg"
+                    + " -f image2 -framerate 30 "
+                    + "-start_number " + num
+                    + " -i " + pictureDirectoryPath + "/screenshot_%05d.tga"
+                    + " -c:v libx264 -vf format=yuv420p -crf 20 "
+                    + videoTargetPathOfLinux
+            );
+            //6 将视频上传到 minio
+            MinioUtil.uploadFromFile(minioClient, videoTargetPathOfLinux, bucketName, videoTargetPathOfMinio);
+            log.info("上传成功:" + videoTargetPathOfMinio);
+            //删除生成的临时文件
+            String removeAll = "rm -rf " + pictureDirectoryPath;
+            log.info("删除全部图片==" + removeAll);
+            LinuxUtil.execute(removeAll);
+            //* -------------------------------- 删除临时文件 --------------------------------
 //        FileUtil.rm(xodrPathOfLinux);
 //        FileUtil.rm(osgbPathOfLinux);
 //        FileUtil.rm(csv1PathOfLinux);
 //        FileUtil.rm(csv2PathOfLinux);
 
-        //* -------------------------------- 删除临时文件 --------------------------------
-        LinuxUtil.execute(removeAll);
-        customRedisClient.set(cpuNodeParallelismKey, (Integer.parseInt(customRedisClient.get(cpuNodeParallelismKey)) + 1) + "");
+        } finally {
+            customRedisClient.decrement(cpuNodeParallelismKey, 1);
+            customRedisClient.unlock(lockName);
+        }
     }
 
     /**