root 2 年之前
父节点
当前提交
112b184612

+ 5 - 0
pom.xml

@@ -94,6 +94,11 @@
             <!-- kubernetes 客户端 -  结束 -->
 
             <!-- 缓存 - 开始 -->
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-starter-data-redis</artifactId>
+                <version>${spring-boot.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-pool2</artifactId>

+ 2 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/configuration/redis/CustomRedisClient.java

@@ -1,6 +1,7 @@
 package com.css.simulation.resource.scheduler.common.configuration.redis;
 
 import api.common.util.CollectionUtil;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.stereotype.Component;
 
@@ -10,6 +11,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 @Component
+@Slf4j
 public class CustomRedisClient {
 
     @Resource

+ 3 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/util/KubernetesUtil.java

@@ -227,9 +227,9 @@ public class KubernetesUtil {
      */
     public static void deletePod2(ApiClient apiClient, String namespaceName, String podName) throws InterruptedException, ApiException {
         deletePodSync(apiClient, namespaceName, podName);
-        log.info("deletePod2() 等待 pod " + podName + " 的资源释放完成。");
+        log.info("等待 pod " + podName + " 的资源释放完成。");
         TimeUnit.SECONDS.sleep(7);
-        log.info("deletePod2()  pod " + podName + " 资源释放完成。");
+        log.info("pod " + podName + " 资源释放完成。");
     }
 
     /**
@@ -241,7 +241,7 @@ public class KubernetesUtil {
      * @param podName       pod 名称
      */
     public static void deletePodSync(ApiClient apiClient, String namespaceName, String podName) throws ApiException, InterruptedException {
-        log.info("deletePodSync() 删除 " + namespaceName + ":" + podName);
+        log.info("删除命名空间 {} 的 pod:{}", namespaceName, podName);
         CoreV1Api coreV1Api = new CoreV1Api(apiClient);
         coreV1Api.deleteNamespacedPod(podName, namespaceName, null, null, null, null, null, null);
         // 检查是否删除完毕

+ 6 - 5
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/manager/TaskManager.java

@@ -120,7 +120,7 @@ public class TaskManager {
     public boolean isProjectCompleted(PrefixTO redisPrefix, String projectId, String projectType, String maxSimulationTime, String taskId, String state, String podName) {
         boolean result = false;
         String nodeName = projectUtil.getNodeNameOfPod(projectId, podName);
-        if ("Running".equals(state)) {  // 运行中的 pod 无需删除
+        if (DictConstants.TASK_RUNNING.equals(state)) {  // 运行中的 pod 无需删除
             // 将运行中的任务的 pod 名称放入 redis
             stringRedisTemplate.opsForValue().set(redisPrefix.getTaskPodKey(), podName);
             taskTick(taskId); // 刷新一下心跳
@@ -131,7 +131,7 @@ public class TaskManager {
             // -------------------------------- 处理状态 --------------------------------
             try {
                 log.info("修改任务 " + taskId + "的状态为 " + state + ",pod 名称为:" + podName + ",并删除 pod。");
-                if ("Aborted".equals(state)) {
+                if (DictConstants.TASK_ABORTED.equals(state)) {
                     String minioPathOfErrorLog = resultPathMinio + projectId + "/" + taskId + "error.log";
                     boolean objectExist = MinioUtil.isObjectExist(minioClient, bucketName, minioPathOfErrorLog);
                     String targetEvaluate;
@@ -153,9 +153,9 @@ public class TaskManager {
                         targetEvaluate = DictConstants.TASK_ERROR_REASON_2;
                     }
                     taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), targetEvaluate);
-                } else if ("Terminated".equals(state)) {
+                } else if (DictConstants.TASK_TERMINATED.equals(state)) {
                     taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_3);
-                } else if ("PendingAnalysis".equals(state)) {
+                } else if (DictConstants.TASK_ANALYSIS.equals(state)) {
                     taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
                     // 查询项目是否使用 gpu 生成视频(0是1否)
                     String isChoiceGpu = projectUtil.getProjectByProjectId(projectId).getIsChoiceGpu();
@@ -163,8 +163,9 @@ public class TaskManager {
                         log.info("项目 {} 使用 GPU 生成视频。", projectId);
                     } else if (DictConstants.VIDEO_CPU.equals(isChoiceGpu)) {
                         log.info("项目 {} 使用 CPU 生成视频。", projectId);
-                        final Boolean success = customRedisClient.getDistributedLock("project:" + projectId + ":task:" + taskId + ":generateVideo:");   // 分布式锁
+                        final Boolean success = customRedisClient.getDistributedLock("project:" + projectId + ":task:" + taskId + ":sendRequestToGenerateVideo:");   // 分布式锁
                         if (Boolean.TRUE.equals(success)) {
+                            log.info("项目 {} 的任务 {} 获取锁成功。", projectId, taskId);
                             FutureTask<ResponseBodyVO<String>> videoTask = new FutureTask<>(() -> videoService.generateVideo(projectId, projectType, maxSimulationTime, taskId));
                             new Thread(videoTask, "generateVideo-" + StringUtil.getRandomEightBitUUID()).start();
                         }

+ 10 - 0
simulation-resource-video/pom.xml

@@ -17,6 +17,16 @@
     </properties>
 
     <dependencies>
+        <!-- 缓存 - 开始 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-redis</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
+        </dependency>
+        <!-- 缓存 - 结束 -->
 
         <!-- minio - 开始 -->
         <dependency>

+ 51 - 0
simulation-resource-video/src/main/java/com/css/simulation/resource/video/common/configuration/redis/CustomRedisClient.java

@@ -0,0 +1,51 @@
+package com.css.simulation.resource.video.common.configuration.redis;
+
+import api.common.util.CollectionUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Component
+@Slf4j
+public class CustomRedisClient {
+
+    @Resource
+    private StringRedisTemplate stringRedisTemplate;
+
+    public void deleteByKey(String key) {
+        stringRedisTemplate.delete(key);
+    }
+
+    public void deleteByKeyCollection(Collection<String> keyCollection) {
+        stringRedisTemplate.delete(keyCollection);
+    }
+
+    public void deleteByPrefix(String prefix) {
+        Set<String> keySetByPrefix = getKeySetByPrefix(prefix);
+        deleteByKeyCollection(keySetByPrefix);
+    }
+
+    public String getStringByKey(String key) {
+        return stringRedisTemplate.opsForValue().get(key);
+    }
+
+    public Set<String> getKeySetByPrefix(String prefix) {
+        return stringRedisTemplate.keys(prefix + "*");
+    }
+
+    public void flushdb() {
+        Set<String> keys = stringRedisTemplate.keys("*");
+        if (CollectionUtil.isNotEmpty(keys)) {
+            stringRedisTemplate.delete(keys);
+        }
+    }
+
+    public Boolean getDistributedLock(String key) {
+        return stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10L, TimeUnit.SECONDS);
+    }
+}

+ 84 - 0
simulation-resource-video/src/main/java/com/css/simulation/resource/video/common/configuration/redis/RedisTemplateConfiguration.java

@@ -0,0 +1,84 @@
+package com.css.simulation.resource.video.common.configuration.redis;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
+import org.springframework.cache.CacheManager;
+import org.springframework.cache.annotation.EnableCaching;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.cache.RedisCacheConfiguration;
+import org.springframework.data.redis.cache.RedisCacheManager;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
+import org.springframework.data.redis.serializer.RedisSerializationContext;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@EnableCaching
+public class RedisTemplateConfiguration {
+
+    @Bean
+    public RedisSerializer<Object> jackson2JsonRedisSerializer() {
+        // 使用 Jackson2JsonRedisSerializer 来序列化和反序列化 redis 的 value 值。
+        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
+        ObjectMapper mapper = new ObjectMapper();
+        // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
+        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
+        // 指定序列化输入的类型,类必须是非 final 修饰的,final修饰的类,比如 String, Integer 等会跑出异常
+        mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
+        serializer.setObjectMapper(mapper);
+        return serializer;
+    }
+
+    @Bean
+    public CacheManager cacheManager(RedisConnectionFactory connectionFactory) {
+        RedisCacheConfiguration configuration = RedisCacheConfiguration.defaultCacheConfig();
+        configuration = configuration
+                // 设置 key 为 string 序列化
+                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
+                // 设置 value 为 json 序列化
+                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer()))
+                // 不缓存空值
+                .disableCachingNullValues()
+                // 设置缓存默认过期时间(30 分钟)
+                .entryTtl(Duration.ofMinutes(30L))
+        ;
+        // 特殊缓存空间应用不同的配置
+        Map<String, RedisCacheConfiguration> map = new HashMap<>();
+        map.put("miFirst", configuration.entryTtl(Duration.ofMinutes(30L)));
+        map.put("miSecond", configuration.entryTtl(Duration.ofHours(1L)));
+
+        return RedisCacheManager.builder(connectionFactory)
+                .cacheDefaults(configuration)           // 默认配置
+                .withInitialCacheConfigurations(map)    // 特殊缓存
+                .transactionAware()                     // 事务
+                .build();
+    }
+
+    @Bean
+    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
+        RedisTemplate<String, Object> template = new RedisTemplate<>();
+        template.setConnectionFactory(connectionFactory);
+        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
+        // key 采用 String 的序列化方式
+        template.setKeySerializer(stringRedisSerializer);
+        // hash 的 key 采用 String 的序列化方式
+        template.setHashKeySerializer(stringRedisSerializer);
+        // value 采用 jackson 的序列化方式
+        template.setValueSerializer(jackson2JsonRedisSerializer());
+        // hash 的 value 采用 jackson 的序列化方式
+        template.setHashValueSerializer(jackson2JsonRedisSerializer());
+        template.afterPropertiesSet();
+        return template;
+    }
+
+
+}

+ 11 - 1
simulation-resource-video/src/main/java/com/css/simulation/resource/video/controller/VideoController.java

@@ -3,6 +3,7 @@ package com.css.simulation.resource.video.controller;
 import api.common.pojo.common.ResponseBodyVO;
 import api.common.pojo.po.scene.VehicleTypePO;
 import api.common.util.LinuxUtil;
+import com.css.simulation.resource.video.common.configuration.redis.CustomRedisClient;
 import com.css.simulation.resource.video.service.VideoService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
@@ -23,6 +24,8 @@ public class VideoController {
 
     @Resource
     private VideoService videoService;
+    @Resource
+    private CustomRedisClient customRedisClient;
     @Value("${scheduler.video-test-command}")
     private String testCommand;
 
@@ -38,7 +41,14 @@ public class VideoController {
             @RequestParam("taskId") String taskId
     ) {
         log.info("收到项目 " + projectId + " 的任务 " + taskId + " 生成 cpu 视频的请求。");
-        return videoService.generateVideo(projectId, projectType, maxSimulationTime, taskId);
+        final Boolean success = customRedisClient.getDistributedLock("project:" + projectId + ":task:" + taskId + ":receiveRequestToGenerateVideo:");   // 分布式锁
+        if (Boolean.TRUE.equals(success)) {
+            return videoService.generateVideo(projectId, projectType, maxSimulationTime, taskId);
+        }
+        {
+            return new ResponseBodyVO<>(ResponseBodyVO.Response.CLIENT_FAILURE, "重复任务。", null);
+        }
+
     }