root hace 2 años
padre
commit
aa60cf87b8

+ 4 - 0
api-common/src/main/java/api/common/pojo/constants/DictConstants.java

@@ -141,4 +141,8 @@ public class DictConstants {
     public static final String EVALUATION_LEVEL_P = "P"; // 超管使用此集群id执行项目
     public static final String EVALUATION_LEVEL_P_DESCRIPTION = "较差(P)"; // 超管使用此集群id执行项目
 
+
+    public static final String VIDEO_GPU = "0";
+    public static final String VIDEO_CPU = "1";
+
 }

+ 6 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/configuration/redis/CustomRedisClient.java

@@ -1,6 +1,7 @@
 package com.css.simulation.resource.scheduler.common.configuration.redis;
 
 import api.common.util.CollectionUtil;
+import lombok.Synchronized;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.stereotype.Component;
 
@@ -41,4 +42,9 @@ public class CustomRedisClient {
             stringRedisTemplate.delete(keys);
         }
     }
+
+    @Synchronized
+    public Boolean getDistributedLock(String key) {
+        return stringRedisTemplate.opsForValue().setIfAbsent(key, "1");
+    }
 }

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/util/ApacheKafkaUtil.java

@@ -40,7 +40,7 @@ public class ApacheKafkaUtil {
      */
     public static void deleteTopic(Admin admin, String... topics) {
         admin.deleteTopics(Arrays.asList(topics));
-        log.info("ApacheKafkaUtil.deleteTopic() 删除主题:" + Arrays.toString(topics));
+        log.info("删除主题:" + Arrays.toString(topics));
     }
 
 

+ 5 - 5
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/util/ProjectUtil.java

@@ -240,7 +240,7 @@ public class ProjectUtil {
      */
     public Map<String, Integer> getNodeMap() {
         List<KubernetesNodeTO> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
-        log.info("getNodeMap() 预设并行度的节点列表为:" + initialNodeList);
+        log.info("预设并行度的节点列表为:" + initialNodeList);
         Map<String, Integer> resultNodeMap = new HashMap<>();    // 用于执行的节点映射(节点名,并行度)
         for (KubernetesNodeTO kubernetesNodeSource : initialNodeList) {
             KubernetesNodeTO kubernetesNodeCopy = kubernetesNodeSource.clone();
@@ -257,7 +257,7 @@ public class ProjectUtil {
             }
             resultNodeMap.put(nodeName, restParallelism);
         }
-        log.info("getNodeMap() 剩余并行度的节点列表为:" + resultNodeMap);
+        log.info("剩余并行度的节点列表为:" + resultNodeMap);
         return resultNodeMap;
     }
 
@@ -301,7 +301,7 @@ public class ProjectUtil {
      */
     public Map<String, Integer> getNodeMapToUse(int parallelism) {
         List<KubernetesNodeTO> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
-        log.info("ProjectUtil--getNodeMapToUse 预设并行度的节点列表为:" + initialNodeList);
+        log.info("预设并行度的节点列表为:" + initialNodeList);
         // 遍历所有节点,获取还有剩余并行度的节点
         List<KubernetesNodeTO> restNodeList = new ArrayList<>();    // 剩余并行度的节点列表
         for (KubernetesNodeTO kubernetesNodeSource : initialNodeList) {
@@ -322,7 +322,7 @@ public class ProjectUtil {
                 restNodeList.add(kubernetesNodeCopy);
             }
         }
-        log.info("ProjectUtil--getNodeMapToUse 剩余并行度的节点列表为:" + restNodeList);
+        log.info("剩余并行度的节点列表为:" + restNodeList);
         Map<String, Integer> resultNodeMap = new HashMap<>();    // 用于执行的节点映射(节点名,并行度)
         if (!CollectionUtil.isEmpty(restNodeList)) {
             if (restNodeList.size() == 1) {
@@ -345,7 +345,7 @@ public class ProjectUtil {
                 }
             }
         }
-        log.info("ProjectUtil--getNodeMapToUse 即将使用节点的并行度为:" + resultNodeMap);
+        log.info("即将使用节点的并行度为:" + resultNodeMap);
         return resultNodeMap;
     }
 

+ 13 - 6
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/manager/TaskManager.java

@@ -4,6 +4,7 @@ import api.common.pojo.common.ResponseBodyVO;
 import api.common.pojo.constants.DictConstants;
 import api.common.util.*;
 import com.css.simulation.resource.scheduler.common.configuration.kubernetes.KubernetesConfiguration;
+import com.css.simulation.resource.scheduler.common.configuration.redis.CustomRedisClient;
 import com.css.simulation.resource.scheduler.common.resource.TaskLock;
 import com.css.simulation.resource.scheduler.common.util.*;
 import com.css.simulation.resource.scheduler.dao.entity.IndexTemplatePO;
@@ -99,6 +100,8 @@ public class TaskManager {
     private ApiClient apiClient;
     @Resource(name = "myKafkaAdmin")
     private Admin admin;
+    @Resource
+    private CustomRedisClient customRedisClient;
 
     public void batchInsertTask(List<TaskPO> taskPOList) {
         try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
@@ -116,18 +119,18 @@ public class TaskManager {
     @SneakyThrows
     public boolean isProjectCompleted(PrefixTO redisPrefix, String projectId, String projectType, String maxSimulationTime, String taskId, String state, String podName) {
         boolean result = false;
-        String nodeName = projectUtil.getNodeNameOfPod(projectId,podName);
+        String nodeName = projectUtil.getNodeNameOfPod(projectId, podName);
         if ("Running".equals(state)) {  // 运行中的 pod 无需删除
             // 将运行中的任务的 pod 名称放入 redis
             stringRedisTemplate.opsForValue().set(redisPrefix.getTaskPodKey(), podName);
             taskTick(taskId); // 刷新一下心跳
-            log.info("state() 修改任务 " + taskId + " 的状态为 " + state + ",pod 名称为:" + podName);
+            log.info("修改任务 " + taskId + " 的状态为 " + state + ",pod 名称为:" + podName);
             taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
             return false;
         } else { // 结束的 pod 都直接删除,并判断项目是否完成
             // -------------------------------- 处理状态 --------------------------------
             try {
-                log.info("state() 修改任务 " + taskId + "的状态为 " + state + ",pod 名称为:" + podName + ",并删除 pod。");
+                log.info("修改任务 " + taskId + "的状态为 " + state + ",pod 名称为:" + podName + ",并删除 pod。");
                 if ("Aborted".equals(state)) {
                     String minioPathOfErrorLog = resultPathMinio + projectId + "/" + taskId + "error.log";
                     boolean objectExist = MinioUtil.isObjectExist(minioClient, bucketName, minioPathOfErrorLog);
@@ -156,13 +159,17 @@ public class TaskManager {
                     taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
                     // 查询项目是否使用 gpu 生成视频(0是1否)
                     String isChoiceGpu = projectUtil.getProjectByProjectId(projectId).getIsChoiceGpu();
-                    log.info("项目 " + projectId + " 是否需要生成 gpu 视频( 0是 1否 ):" + isChoiceGpu);
-                    if ("1".equals(isChoiceGpu)) {
-                        final Boolean success = stringRedisTemplate.opsForValue().setIfAbsent("project:" + projectId + ":task:" + taskId + ":generateVideo:", "1");   // 分布式锁
+                    if (DictConstants.VIDEO_GPU.equals(isChoiceGpu)) {
+                        log.info("项目 {} 使用 GPU 生成视频。", projectId);
+                    } else if (DictConstants.VIDEO_CPU.equals(isChoiceGpu)) {
+                        log.info("项目 {} 使用 CPU 生成视频。", projectId);
+                        final Boolean success = customRedisClient.getDistributedLock("project:" + projectId + ":task:" + taskId + ":generateVideo:");   // 分布式锁
                         if (Boolean.TRUE.equals(success)) {
                             FutureTask<ResponseBodyVO<String>> videoTask = new FutureTask<>(() -> videoService.generateVideo(projectId, projectType, maxSimulationTime, taskId));
                             new Thread(videoTask, "generateVideo-" + StringUtil.getRandomEightBitUUID()).start();
                         }
+                    } else {
+                        throw new RuntimeException("未设置视频生成。");
                     }
                 }
                 // -------------------------------- 判断项目是否结束 --------------------------------

+ 3 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -37,10 +37,10 @@ public class TaskService {
 
     @SneakyThrows
     public void taskState(String taskId, String state, String podName) {
-        log.info("TaskService--state 接收到参数为:taskId=" + taskId + ",state=" + state + ",podName=" + podName);
+        log.info("接收到参数为:taskId=" + taskId + ",state=" + state + ",podName=" + podName);
         TaskPO taskPO = taskMapper.selectById(taskId);
         if (taskPO == null) {
-            log.error("TaskManager--isProjectCompleted 接收到已删除但还在执行的任务:taskId=" + taskId + ",state=" + state + ",podName=" + podName);
+            log.error("接收到已删除但还在执行的任务:taskId=" + taskId + ",state=" + state + ",podName=" + podName);
             return;
         }
         String projectId = taskPO.getPId(); // 项目 id
@@ -57,7 +57,7 @@ public class TaskService {
         SshClient sshClient = SshUtil.getClient();
         ClientSession clientSession = SshUtil.getSession(sshClient, hostname, username, password);
         //2 准备打分
-        log.info("项目 " + projectId + "准备打分。");
+        log.info("项目 " + projectId + " 准备打分。");
         taskManager.prepareScore(redisPrefix.getProjectRunningKey());
         //3 打分
         taskManager.score(userId, projectId, projectType, clientSession);

+ 1 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/feign/VideoService.java

@@ -3,6 +3,7 @@ package com.css.simulation.resource.scheduler.service.feign;
 import api.common.pojo.common.ResponseBodyVO;
 import com.css.simulation.resource.scheduler.service.feign.fallback.VideoServiceFallBack;
 import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 

+ 15 - 15
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/consumer/ProjectConsumer.java

@@ -100,7 +100,7 @@ public class ProjectConsumer {
     public void createTaskAndFixData(ConsumerRecord<String, String> projectRecord) {
         //* -------------------------------- 0 读取消息,创建临时目录 --------------------------------
         String initialProjectJson = projectRecord.value();
-        log.info("createTaskAndFixData() 接收到项目开始消息为:" + initialProjectJson);
+        log.info("接收到项目开始消息为:" + initialProjectJson);
         ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(initialProjectJson, ProjectMessageDTO.class);
         String projectId = projectMessageDTO.getProjectId();                // 手动执行项目 id 或 自动执行子项目 id
         String modelType = projectMessageDTO.getModelType();                // 模型类型,1 动力学模型 2 carsim模型
@@ -122,7 +122,7 @@ public class ProjectConsumer {
         indexMapper.deleteFirstTargetScoreByProjectId(projectId);
         indexMapper.deleteLastTargetScoreByProjectId(projectId);
         // -------------------------------- 1 查询场景 --------------------------------
-        log.info("createTaskAndFixData() 项目 " + projectId + " 开始查询场景。");
+        log.info("项目 " + projectId + " 开始查询场景。");
         //根据场景测试包 packageId,拿到场景集合(包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
         List<ScenePO> scenePOList = projectService.getSceneList(projectId, packageId);
         int taskTotal = scenePOList.size();
@@ -130,20 +130,20 @@ public class ProjectConsumer {
         projectMessageDTO.setTaskCompleted(0);
         //去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
         Set<ScenePO> scenePOSet = new HashSet<>(scenePOList);
-        log.info("createTaskAndFixData() 项目 " + projectId + " 场景包括:" + scenePOSet);
+        log.info("项目 " + projectId + " 场景包括:" + scenePOSet);
         // -------------------------------- 2 算法导入 --------------------------------
-        log.info("createTaskAndFixData() 项目 " + projectId + " 开始算法导入。");
+        log.info("项目 " + projectId + " 开始算法导入。");
         String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
-        log.info("createTaskAndFixData() 项目 " + projectId + " 算法已导入:" + algorithmDockerImage);
+        log.info("项目 " + projectId + " 算法已导入:" + algorithmDockerImage);
         // -------------------------------- 3 查询模型 --------------------------------
         if ("1".equals(modelType)) {
-            log.info("createTaskAndFixData() 项目 " + projectId + " 开始查询模型。");
+            log.info("项目 " + projectId + " 开始查询模型。");
             //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
             VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
             List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
             List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
             // -------------------------------- 4 保存任务消息 --------------------------------
-            log.info("createTaskAndFixData() 项目 " + projectId + " 开始保存任务消息。");
+            log.info("项目 " + projectId + " 开始保存任务消息。");
             List<TaskPO> taskList = new ArrayList<>();
             for (ScenePO scenePO : scenePOSet) {
                 String sceneId = scenePO.getId();
@@ -155,7 +155,7 @@ public class ProjectConsumer {
                     lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
                 }
                 if (CollectionUtil.isEmpty(lastTargetIdList)) {
-                    throw new RuntimeException("createTaskAndFixData() 项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
+                    throw new RuntimeException("项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
                 }
                 for (String lastTargetId : lastTargetIdList) {
                     String taskId = StringUtil.getRandomUUID();
@@ -252,19 +252,19 @@ public class ProjectConsumer {
                                     .build())
                             .build();
                     FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskTO), projectPath + taskId + ".json");
-                    log.info("createTaskAndFixData() 项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskTO.getInfo().getTask_id());
+                    log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskTO.getInfo().getTask_id());
                 }
             }
             taskManager.batchInsertTask(taskList);
-            log.info("createTaskAndFixData() 项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
+            log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
         } else if ("2".equals(modelType)) {
-            log.info("createTaskAndFixData() 项目 " + projectId + " 开始查询模型。");
+            log.info("项目 " + projectId + " 开始查询模型。");
 
             VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
             List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
             List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
             // -------------------------------- 4 保存任务消息 --------------------------------
-            log.info("createTaskAndFixData() 项目 " + projectId + " 开始保存任务消息。");
+            log.info("项目 " + projectId + " 开始保存任务消息。");
             List<TaskPO> taskList = new ArrayList<>();
             for (ScenePO scenePO : scenePOSet) {
                 String sceneId = scenePO.getId();
@@ -276,7 +276,7 @@ public class ProjectConsumer {
                     lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
                 }
                 if (CollectionUtil.isEmpty(lastTargetIdList)) {
-                    throw new RuntimeException("createTaskAndFixData() 项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
+                    throw new RuntimeException("项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
                 }
                 for (String lastTargetId : lastTargetIdList) {
                     String taskId = StringUtil.getRandomUUID();
@@ -357,11 +357,11 @@ public class ProjectConsumer {
                             .build();
 
                     FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskTO), projectPath + taskId + ".json");
-                    log.info("createTaskAndFixData() 项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskTO.getInfo().getTask_id());
+                    log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskTO.getInfo().getTask_id());
                 }
             }
             taskManager.batchInsertTask(taskList);
-            log.info("createTaskAndFixData() 项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
+            log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
         }
 
         //* -------------------------------- 4 开始排队 --------------------------------

+ 1 - 1
simulation-resource-server/src/main/java/com/css/simulation/resource/util/ApacheKafkaUtil.java

@@ -40,7 +40,7 @@ public class ApacheKafkaUtil {
      */
     public static void deleteTopic(Admin admin, String... topics) {
         admin.deleteTopics(Arrays.asList(topics));
-        log.info("ApacheKafkaUtil.deleteTopic() 删除主题:" + Arrays.toString(topics));
+        log.info("删除主题:" + Arrays.toString(topics));
     }
 
 

+ 1 - 1
simulation-resource-video/src/main/java/com/css/simulation/resource/video/controller/VideoController.java

@@ -37,7 +37,7 @@ public class VideoController {
             @RequestParam("maxSimulationTime") String maxSimulationTime,
             @RequestParam("taskId") String taskId
     ) {
-        log.info("收到项目 " + projectId + " 的任务" + taskId + " 生成 cpu 视频的请求。");
+        log.info("收到项目 " + projectId + " 的任务 " + taskId + " 生成 cpu 视频的请求。");
         return videoService.generateVideo(projectId, projectType, maxSimulationTime, taskId);
     }
 

+ 7 - 12
simulation-resource-video/src/main/java/com/css/simulation/resource/video/service/VideoService.java

@@ -75,7 +75,6 @@ public class VideoService {
      */
     @SneakyThrows
     public ResponseBodyVO<String> generateVideo(String projectId, String projectType, String maxSimulationTime, String taskId) {
-        //1 获取两个 csv 的目录,和 xodr 和 osgb 三个路径
         String rootDirectoryPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/";
         String xodrPathOfMinio = rootDirectoryPathOfMinio + taskId + ".xodr";
         String osgbPathOfMinio = rootDirectoryPathOfMinio + taskId + ".osgb";
@@ -86,32 +85,28 @@ public class VideoService {
         String osgbPathOfLinux = rootDirectoryPathOfLinux + taskId + ".osgb";
         String csv1PathOfLinux = rootDirectoryPathOfLinux + csv1Name;
         String csv2PathOfLinux = rootDirectoryPathOfLinux + csv2Name;
-        //2 下载 csv、xodr、osgb
+        log.info("下载 csv、xodr、osgb。");
         MinioUtil.downloadToFile(minioClient, bucketName, xodrPathOfMinio, xodrPathOfLinux);
         MinioUtil.downloadToFile(minioClient, bucketName, osgbPathOfMinio, osgbPathOfLinux);
         MinioUtil.downloadToFile(minioClient, bucketName, csv1PathOfMinio, csv1PathOfLinux);
         MinioUtil.downloadToFile(minioClient, bucketName, csv2PathOfMinio, csv2PathOfLinux);
 
-        //3 生成 xosc 文件
+        log.info("生成 xosc 文件。");
         String xoscPath = generateXosc(rootDirectoryPathOfLinux, xodrPathOfLinux, osgbPathOfLinux, projectId, projectType);
-        // 启动虚拟窗口
-        log.info("生成 xosc 文件路径:" + xoscPath);
-        //4 生成图片
+        log.info("启动虚拟窗口并通过 esmini 截图。");
         String pictureDirectoryPath = rootDirectoryPathOfLinux + "picture";
         FileUtil.createDirectory(pictureDirectoryPath);
         String esminiCommandTemp = esminiCommand + " " + xoscPath + " " + pictureDirectoryPath + "/screenshot " + StringUtil.doubleToString(Double.parseDouble(maxSimulationTime), 2);
         LinuxUtil.execute2(XvfbCommand, esminiCommandTemp);
-        LinuxUtil.kill(esminiCommandTemp);  // 执行完手动删除 esmini 进程
-//        String esminiResult = LinuxUtil.execute(esminiCommandTemp);
+        log.info("删除 esmini 进程。");
+        LinuxUtil.kill(esminiCommandTemp);
         int num = 14;
         for (int i = 0; i < num; i++) {
             String remove = "rm -f " + pictureDirectoryPath + "/screenshot_0000" + i + ".tga";
-            log.info("删除图片==" + remove);
+            log.info("删除图片" + remove);
             LinuxUtil.execute(remove);
         }
-
-
-        //5 生成视频
+        log.info("生成视频。");
         String videoTargetPathOfLinux = rootDirectoryPathOfLinux + "video/" + videoName;
         FileUtil.createParentDirectory(videoTargetPathOfLinux);
         String videoTargetPathOfMinio = rootDirectoryPathOfMinio + videoName;