root 2 years ago
parent
commit
190693654e

+ 9 - 3
simulation-resource-common/src/main/java/com/css/simulation/resource/common/controller/MinioController.java

@@ -153,14 +153,20 @@ public class MinioController {
     @RequestMapping("/getPreviewUrl")
     @RequestMapping("/getPreviewUrl")
     public ResponseBodyVO<String> getPreviewUrl(@RequestBody @Validated MinioParameter minioParameter) {
     public ResponseBodyVO<String> getPreviewUrl(@RequestBody @Validated MinioParameter minioParameter) {
 //        String previewUrl = MinioUtil.getPreviewUrl(minioClientPublic, Method.GET, minioConfiguration.getBucketName(), minioParameter.getObjectName());
 //        String previewUrl = MinioUtil.getPreviewUrl(minioClientPublic, Method.GET, minioConfiguration.getBucketName(), minioParameter.getObjectName());
-        String previewUrl = MinioUtil.getPublicPreviewUrl(minioClientPrivate, Method.GET, minioConfiguration.getBucketName(), minioParameter.getObjectName(),
-                minioConfiguration.getEndpointPrivate(), minioConfiguration.getEndpointPublic());
+        String previewUrl = MinioUtil.getPublicPreviewUrl(
+                minioClientPrivate,
+                Method.GET,
+                minioConfiguration.getBucketName(),
+                minioParameter.getObjectName(),
+                minioConfiguration.getEndpointPrivate(),
+                minioConfiguration.getEndpointPublic()
+        );
         return new ResponseBodyVO<>(ResponseBodyVO.Response.SUCCESS, "请求成功!", previewUrl);
         return new ResponseBodyVO<>(ResponseBodyVO.Response.SUCCESS, "请求成功!", previewUrl);
     }
     }
 
 
     @RequestMapping("/createMultipartUpload")
     @RequestMapping("/createMultipartUpload")
     @ResponseBody
     @ResponseBody
-    public ResponseBodyVO<Map<String, Object>> createMultipartUpload(@RequestBody Map<String, String> paramMap) throws Exception {
+    public ResponseBodyVO<Map<String, Object>> createMultipartUpload(@RequestBody Map<String, String> paramMap) {
         try {
         try {
             String objectName = paramMap.get("objectName");
             String objectName = paramMap.get("objectName");
             String chunkSize = paramMap.get("chunkSize");
             String chunkSize = paramMap.get("chunkSize");

+ 14 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/configuration/init/MyApplicationRunner.java

@@ -0,0 +1,14 @@
+package com.css.simulation.resource.scheduler.common.configuration.init;
+
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MyApplicationRunner implements ApplicationRunner {
+
+    @Override
+    public void run(ApplicationArguments args) {
+        System.out.println("ApplicationRunner");
+    }
+}

+ 10 - 7
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/manager/TaskManager.java

@@ -156,26 +156,29 @@ public class TaskManager {
                     taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
                     taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
                     // 查询项目是否使用 gpu 生成视频(0是1否)
                     // 查询项目是否使用 gpu 生成视频(0是1否)
                     String isChoiceGpu = projectUtil.getProjectByProjectId(projectId).getIsChoiceGpu();
                     String isChoiceGpu = projectUtil.getProjectByProjectId(projectId).getIsChoiceGpu();
-                    log.info("isCompleted() 项目 " + projectId + " 是否需要生成 gpu 视频( 0是 1否 ):" + isChoiceGpu);
+                    log.info("项目 " + projectId + " 是否需要生成 gpu 视频( 0是 1否 ):" + isChoiceGpu);
                     if ("1".equals(isChoiceGpu)) {
                     if ("1".equals(isChoiceGpu)) {
-                        FutureTask<ResponseBodyVO<String>> videoTask = new FutureTask<>(() -> videoService.generateVideo(projectId, projectType, maxSimulationTime, taskId));
-                        new Thread(videoTask, "video-" + StringUtil.getRandomEightBitUUID()).start();
+                        final Boolean success = stringRedisTemplate.opsForValue().setIfAbsent("task:" + taskId + ":generateVideo:", "1");   // 分布式锁
+                        if (Boolean.TRUE.equals(success)) {
+                            FutureTask<ResponseBodyVO<String>> videoTask = new FutureTask<>(() -> videoService.generateVideo(projectId, projectType, maxSimulationTime, taskId));
+                            new Thread(videoTask, "generateVideo-" + StringUtil.getRandomEightBitUUID()).start();
+                        }
                     }
                     }
                 }
                 }
                 // -------------------------------- 判断项目是否结束 --------------------------------
                 // -------------------------------- 判断项目是否结束 --------------------------------
                 result = taskLock.complete(redisPrefix, projectId, nodeName, podName);
                 result = taskLock.complete(redisPrefix, projectId, nodeName, podName);
                 if (!result) {
                 if (!result) {
-                    log.info("isCompleted() 项目 " + projectId + " 还未运行完成。");
+                    log.info("项目 " + projectId + " 还未运行完成。");
                     projectUtil.createNextPod3(projectId, nodeName, podName);
                     projectUtil.createNextPod3(projectId, nodeName, podName);
                 } else {
                 } else {
                     //如果项目已完成先把 pod 删除,并归还并行度
                     //如果项目已完成先把 pod 删除,并归还并行度
                     KubernetesUtil.deletePod2(apiClient, kubernetesConfiguration.getNamespace(), podName);
                     KubernetesUtil.deletePod2(apiClient, kubernetesConfiguration.getNamespace(), podName);
                     projectUtil.addOneParallelismToNode(nodeName);
                     projectUtil.addOneParallelismToNode(nodeName);
                 }
                 }
-                RedisUtil.deleteByPrefix(stringRedisTemplate,redisPrefix.getTaskMessageKey());
-                RedisUtil.deleteByPrefix(stringRedisTemplate,redisPrefix.getTaskPodKey());
+                RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getTaskMessageKey());
+                RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getTaskPodKey());
             } catch (Exception exception) {
             } catch (Exception exception) {
-                log.info("isCompleted() 报错。", exception);
+                log.info("报错。", exception);
             }
             }
         }
         }
         return result;
         return result;

+ 49 - 60
simulation-resource-server/src/main/java/com/css/simulation/resource/project/impl/SimulationProjectServiceImpl.java

@@ -366,73 +366,62 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
             }, failure -> {
             }, failure -> {
                 log.error("发送消息失败:" + failure.getMessage());
                 log.error("发送消息失败:" + failure.getMessage());
             });
             });
-        } catch (NumberFormatException | JsonProcessingException e) {
+        } catch (NumberFormatException e) {
             throw new RuntimeException(e);
             throw new RuntimeException(e);
         }
         }
     }
     }
 
 
     private void projectStopToKafka(SimulationManualProjectPo po) {
     private void projectStopToKafka(SimulationManualProjectPo po) {
-        try {
-            SimulationManualProjectKafkaParam kafkaParam = new SimulationManualProjectKafkaParam();
-            kafkaParam.setProjectId(po.getId());
-            kafkaParam.setType(DictConstants.PROJECT_TYPE_MANUAL);
-            KafkaParameter kafkaParameter = new KafkaParameter();
-            kafkaParameter.setTopic(ProjectConstants.STOP_TASK_TOPIC);
-            String data = JsonUtil.beanToJson(kafkaParam);
-            kafkaParameter.setData(data);
-            log.info("推送项目中止消息到kafka:" + data);
+        SimulationManualProjectKafkaParam kafkaParam = new SimulationManualProjectKafkaParam();
+        kafkaParam.setProjectId(po.getId());
+        kafkaParam.setType(DictConstants.PROJECT_TYPE_MANUAL);
+        KafkaParameter kafkaParameter = new KafkaParameter();
+        kafkaParameter.setTopic(ProjectConstants.STOP_TASK_TOPIC);
+        String data = JsonUtil.beanToJson(kafkaParam);
+        kafkaParameter.setData(data);
+        log.info("推送项目中止消息到kafka:" + data);
 //        kafkaService.send(kafkaParameter);
 //        kafkaService.send(kafkaParameter);
-            kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
-                // 消息发送到的topic
-                String topic = success.getRecordMetadata().topic();
-                // 消息发送到的分区
-                int partition = success.getRecordMetadata().partition();
-                // 消息在分区内的offset
-                long offset = success.getRecordMetadata().offset();
-                log.info("------- 发送消息成功:\n"
-                        + "主题 topic 为:" + topic + "\n"
-                        + "分区 partition 为:" + partition + "\n"
-                        + "偏移量为:" + offset + "\n"
-                        + "消息体为:" + kafkaParameter.getData());
-            }, failure -> {
-                log.error("发送消息失败:" + failure.getMessage());
-            });
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException(e);
-        }
+        kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
+            // 消息发送到的topic
+            String topic = success.getRecordMetadata().topic();
+            // 消息发送到的分区
+            int partition = success.getRecordMetadata().partition();
+            // 消息在分区内的offset
+            long offset = success.getRecordMetadata().offset();
+            log.info("------- 发送消息成功:\n"
+                    + "主题 topic 为:" + topic + "\n"
+                    + "分区 partition 为:" + partition + "\n"
+                    + "偏移量为:" + offset + "\n"
+                    + "消息体为:" + kafkaParameter.getData());
+        }, failure -> {
+            log.error("发送消息失败:" + failure.getMessage());
+        });
 
 
     }
     }
 
 
     private void autoProjectStopToKafka(SimulationAutomaticSubProjectPo po) {
     private void autoProjectStopToKafka(SimulationAutomaticSubProjectPo po) {
-        try {
-            SimulationManualProjectKafkaParam kafkaParam = new SimulationManualProjectKafkaParam();
-            kafkaParam.setProjectId(po.getId());
-            kafkaParam.setType(DictConstants.PROJECT_TYPE_AUTO_SUB);
-            KafkaParameter kafkaParameter = new KafkaParameter();
-            kafkaParameter.setTopic(ProjectConstants.STOP_TASK_TOPIC);
-            String data = JsonUtil.beanToJson(kafkaParam);
-            kafkaParameter.setData(data);
-            log.info("推送自动项目中止消息到kafka:" + data);
+        SimulationManualProjectKafkaParam kafkaParam = new SimulationManualProjectKafkaParam();
+        kafkaParam.setProjectId(po.getId());
+        kafkaParam.setType(DictConstants.PROJECT_TYPE_AUTO_SUB);
+        KafkaParameter kafkaParameter = new KafkaParameter();
+        kafkaParameter.setTopic(ProjectConstants.STOP_TASK_TOPIC);
+        String data = JsonUtil.beanToJson(kafkaParam);
+        kafkaParameter.setData(data);
+        log.info("推送自动项目中止消息到kafka:" + data);
 //        kafkaService.send(kafkaParameter);
 //        kafkaService.send(kafkaParameter);
-            kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
-                // 消息发送到的topic
-                String topic = success.getRecordMetadata().topic();
-                // 消息发送到的分区
-                int partition = success.getRecordMetadata().partition();
-                // 消息在分区内的offset
-                long offset = success.getRecordMetadata().offset();
-                log.info("------- 发送消息成功:\n"
-                        + "主题 topic 为:" + topic + "\n"
-                        + "分区 partition 为:" + partition + "\n"
-                        + "偏移量为:" + offset + "\n"
-                        + "消息体为:" + kafkaParameter.getData());
-            }, failure -> {
-                log.error("发送消息失败:" + failure.getMessage());
-            });
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException(e);
-        }
-
+        kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
+            // 消息发送到的topic
+            String topic = success.getRecordMetadata().topic();
+            // 消息发送到的分区
+            int partition = success.getRecordMetadata().partition();
+            // 消息在分区内的offset
+            long offset = success.getRecordMetadata().offset();
+            log.info("------- 发送消息成功:\n"
+                    + "主题 topic 为:" + topic + "\n"
+                    + "分区 partition 为:" + partition + "\n"
+                    + "偏移量为:" + offset + "\n"
+                    + "消息体为:" + kafkaParameter.getData());
+        }, failure -> log.error("发送消息失败:" + failure.getMessage()));
     }
     }
 
 
 
 
@@ -959,10 +948,10 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
     public ResponseBodyVO<ProjectTaskDetailsVo> selectProjectTaskById(SimulationManualProjectParam param) {
     public ResponseBodyVO<ProjectTaskDetailsVo> selectProjectTaskById(SimulationManualProjectParam param) {
         ProjectTaskDetailsVo resultVo = new ProjectTaskDetailsVo();
         ProjectTaskDetailsVo resultVo = new ProjectTaskDetailsVo();
         //1 获取参数
         //1 获取参数
-        Optional.ofNullable(param).orElseThrow(() -> new RuntimeException("参数不能为空"));
-        Optional.ofNullable(param.getId()).orElseThrow(() -> new RuntimeException("项目 id 不能为空"));
-        Optional.ofNullable(param.getTaskId()).orElseThrow(() -> new RuntimeException("任务 id 不能为空"));
-        Optional.ofNullable(param.getTaskId()).orElseThrow(() -> new RuntimeException("projectType 不能为空"));
+        Optional.ofNullable(param).orElseThrow(() -> new RuntimeException("参数不能为空"));
+        Optional.ofNullable(param.getId()).orElseThrow(() -> new RuntimeException("项目 id 不能为空"));
+        Optional.ofNullable(param.getTaskId()).orElseThrow(() -> new RuntimeException("任务 id 不能为空"));
+        Optional.ofNullable(param.getProjectType()).orElseThrow(() -> new RuntimeException("项目类型不能为空。"));
         String id = param.getId();
         String id = param.getId();
         String taskId = param.getTaskId();
         String taskId = param.getTaskId();
         String projectType = param.getProjectType();    // 2
         String projectType = param.getProjectType();    // 2