root 2 سال پیش
والد
کامیت
6e9f248e99

+ 9 - 6
api-common/src/main/java/api/common/util/JsonUtil.java

@@ -27,10 +27,13 @@ public class JsonUtil {
      * @param bean bean 对象
      * @param <T>  声明为泛型方法
      * @return json 字符串
-     * @throws JsonProcessingException 异常
      */
-    public static <T> String beanToJson(T bean) throws JsonProcessingException {
-        return new ObjectMapper().writeValueAsString(bean);
+    public static <T> String beanToJson(T bean) {
+        try {
+            return new ObjectMapper().writeValueAsString(bean);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /**
@@ -100,15 +103,15 @@ public class JsonUtil {
         return new TextNode("");
     }
 
-//泛化用的,把带下划线的json转换为驼峰jsom
-    public static String  stringToJson(String ss) throws JsonProcessingException {
+    //泛化用的,把带下划线的json转换为驼峰jsom
+    public static String stringToJson(String ss) throws JsonProcessingException {
         Map maps = jsonToMap(ss);
         Map map = new HashMap();
         for (Object obj : maps.keySet()) {
             String value = underlineToHump(obj.toString());
             map.put(value, maps.get(obj));
         }
-       return beanToJson(map);
+        return beanToJson(map);
     }
 
     private static Pattern UNDERLINE_PATTERN = Pattern.compile("_([a-z])");

+ 9 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/util/ProjectUtil.java

@@ -529,4 +529,13 @@ public class ProjectUtil {
         stringRedisTemplate.opsForValue().set(key, parallelismAfter + "");
         log.info("addOneParallelismToNode() 归还节点 " + nodeName + " 并行度:" + parallelismBefore + " --> " + parallelismAfter);
     }
+
+    public List<String> getWaitingProjectMessageKeys() {
+        final Set<String> keys = stringRedisTemplate.keys("*");
+        if (CollectionUtil.isEmpty(keys)) {
+            return new ArrayList<>();
+        } else {
+            return keys.stream().filter(key -> key.contains("waiting") && key.contains("message")).collect(Collectors.toList());
+        }
+    }
 }

+ 0 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/manager/TaskManager.java

@@ -126,7 +126,6 @@ public class TaskManager {
             return false;
         } else { // 结束的 pod 都直接删除,并判断项目是否完成
             // -------------------------------- 处理状态 --------------------------------
-            // TODO 暂时不用重试操作
             try {
                 log.info("state() 修改任务 " + taskId + "的状态为 " + state + ",pod 名称为:" + podName + ",并删除 pod。");
                 if ("Aborted".equals(state)) {

+ 0 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/consumer/ProjectConsumer.java

@@ -587,7 +587,6 @@ public class ProjectConsumer {
      * @param projectWaitingKey 项目等待 key
      * @param projectMessageDTO 项目信息
      */
-    @SneakyThrows
     public void wait(String projectWaitingKey, ProjectMessageDTO projectMessageDTO) {
         stringRedisTemplate.opsForValue().set(projectWaitingKey, JsonUtil.beanToJson(projectMessageDTO));
     }

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/advice/AllExceptionHandlerAdvice.java → simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/controller/advice/AllExceptionHandlerAdvice.java

@@ -1,4 +1,4 @@
-package com.css.simulation.resource.scheduler.web.advice;
+package com.css.simulation.resource.scheduler.web.controller.advice;
 
 import api.common.pojo.common.ResponseBodyVO;
 import lombok.extern.slf4j.Slf4j;

+ 105 - 201
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/scheduler/ProjectScheduler.java

@@ -1,209 +1,113 @@
-//package com.css.simulation.resource.scheduler.web.scheduler;
-//
-//import api.common.pojo.constants.DictConstants;
-//import api.common.pojo.dto.ProjectMessageDTO;
-//import api.common.util.CollectionUtil;
-//import api.common.util.JsonUtil;
-//import api.common.util.StringUtil;
-//import com.css.simulation.resource.scheduler.common.configuration.kubernetes.KubernetesConfiguration;
-//import com.css.simulation.resource.scheduler.common.util.ProjectUtil;
-//import com.css.simulation.resource.scheduler.common.util.RedisUtil;
-//import com.css.simulation.resource.scheduler.dao.entity.ClusterPO;
-//import com.css.simulation.resource.scheduler.dao.entity.ProjectPO;
-//import com.css.simulation.resource.scheduler.dao.entity.UserPO;
-//import com.css.simulation.resource.scheduler.dao.mapper.*;
-//import com.css.simulation.resource.scheduler.service.TaskService;
-//import com.css.simulation.resource.scheduler.service.domain.PrefixTO;
-//import com.css.simulation.resource.scheduler.web.consumer.ProjectConsumer;
-//import lombok.SneakyThrows;
-//import lombok.extern.slf4j.Slf4j;
-//import org.springframework.beans.factory.annotation.Value;
-//import org.springframework.data.redis.core.StringRedisTemplate;
-//import org.springframework.scheduling.annotation.Scheduled;
-//import org.springframework.stereotype.Component;
-//
-//import javax.annotation.Resource;
-//import java.util.ArrayList;
-//import java.util.List;
-//import java.util.Set;
-//
-//@Component
-//@Slf4j
-//public class ProjectScheduler {
-//
-//    @Value("${scheduler.host.hostname}")
-//    String hostname;
-//    @Value("${scheduler.host.username}")
-//    String username;
-//    @Value("${scheduler.host.password}")
-//    String password;
-//    // -------------------------------- Comment --------------------------------
-//    @Resource
-//    StringRedisTemplate stringRedisTemplate;
-//    @Resource
-//    TaskService taskService;
-//    @Resource
-//    TaskMapper taskMapper;
-//    @Resource
-//    ClusterMapper clusterMapper;
-//    @Resource
-//    ManualProjectMapper manualProjectMapper;
-//    @Resource
-//    AutoSubProjectMapper autoSubProjectMapper;
-//    @Resource
-//    ProjectConsumer projectConsumer;
-//    @Resource
-//    ProjectUtil projectUtil;
-//    @Resource
-//    UserMapper userMapper;
-//    @Resource
-//    KubernetesConfiguration kubernetesConfiguration;
-//
-//
+package com.css.simulation.resource.scheduler.web.scheduler;
+
+import api.common.pojo.dto.ProjectMessageDTO;
+import api.common.util.CollectionUtil;
+import api.common.util.JsonUtil;
+import com.css.simulation.resource.scheduler.common.util.ProjectUtil;
+import com.css.simulation.resource.scheduler.common.util.RedisUtil;
+import com.css.simulation.resource.scheduler.web.consumer.ProjectConsumer;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.List;
+
+@Component
+@Slf4j
+public class ProjectScheduler {
+
+    @Value("${scheduler.host.hostname}")
+    private String hostname;
+    @Value("${scheduler.host.username}")
+    private String username;
+    @Value("${scheduler.host.password}")
+    private String password;
+    // -------------------------------- Comment --------------------------------
+    @Resource
+    private StringRedisTemplate stringRedisTemplate;
+    @Resource
+    private ProjectConsumer projectConsumer;
+    @Resource
+    private ProjectUtil projectUtil;
+
+
+    /**
+     * 调度项目启动
+     */
+    @Scheduled(fixedDelay = 60 * 1000)
+    public void dispatchProject() {
+        log.info("//1 查询等待中的项目任务。");
+        List<String> projectMessageKeys = projectUtil.getWaitingProjectMessageKeys();
+        if (!CollectionUtil.isEmpty(projectMessageKeys)) {
+            for (String projectMessageKey : projectMessageKeys) {
+                final String projectMessage = RedisUtil.getStringByKey(stringRedisTemplate, projectMessageKey);
+                ConsumerRecord<String, String> projectRecord = new ConsumerRecord<>("", 0, 0L, null, projectMessage);
+                projectConsumer.acceptMessage(projectRecord);
+            }
+        }
+    }
+
+
+    @SneakyThrows
+    public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey) {
+        ProjectMessageDTO projectMessageDTO;
+        try {
+            projectMessageDTO = JsonUtil.jsonToBean(RedisUtil.getStringByKey(stringRedisTemplate, projectWaitingKey), ProjectMessageDTO.class);
+        } catch (Exception e) {
+            log.info("run 等待执行的项目信息已经从 redis 删除。");
+            return;
+        }
+        //1 获取所有节点的剩余可用并行度
+        int restParallelism = projectUtil.getRestParallelism();
+        if (restParallelism == 0) {
+            log.info("run 集群中没有可用并行度,项目 " + projectId + " 继续等待。");
+            return;
+        }
+        //2 判断剩余可用并行度是否大于项目并行度,否则继续等待
+        if (restParallelism > 0L) {
+            log.info("run 集群 " + clusterId + " 执行项目项目 " + projectId);
+            projectMessageDTO.setCurrentParallelism(restParallelism);    // 设置实际的并行度
+            projectConsumer.parseProject(projectMessageDTO, projectRunningKey);
+        }
+    }
+
+
 //    /**
-//     * 调度项目启动
+//     * 处理 pod 超时
+//     * 同时也可处理 pod 莫名关闭,因为关闭之后也会超时
 //     */
 //    @Scheduled(fixedDelay = 60 * 1000)
-//    @SneakyThrows
-//    public void dispatchProject() {
-//
-//        //1 查询已经在页面上点击运行的项目(后面会判断是排队中还是已经与运行了)
-//        List<ProjectPO> manualProjectList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
-//        List<ProjectPO> autoSubProjectList = autoSubProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
-//        List<ProjectPO> allProject = new ArrayList<>();
-//        allProject.addAll(manualProjectList);
-//        allProject.addAll(autoSubProjectList);
-//
-//
-//        for (ProjectPO project : allProject) {
-//            String projectId = project.getId();
-//            int parallelism = Integer.parseInt(project.getParallelism());
-//            String userId = project.getCreateUserId();
-//            UserPO userPO = userMapper.selectById(userId);
-//            String roleCode = userPO.getRoleCode();
-//            String useType = userPO.getUseType();
-//            ClusterPO clusterPO = null;
-//            String clusterId;
-//            boolean isSystem = false;
-//            if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
-//                clusterId = DictConstants.SYSTEM_CLUSTER_ID;
-//                isSystem = true;
-//            } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
-//                clusterPO = clusterMapper.selectByUserId(userId);
-//                if (clusterPO == null) {
-//                    log.error("dispatchProject 项目 " + projectId + " 的创建用户 " + userId + " 没有分配集群!");
-//                    return;
-//                }
-//                clusterId = clusterPO.getId();
-//            } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
-//                if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) {   //3-3 普通子账户,根据自己的独占节点排队
-//                    clusterPO = clusterMapper.selectByUserId(userId);
-//                } else {    //3-4 共享子账户,根据父账户的共享节点排队
-//                    String parentUserId = userPO.getCreateUserId();
-//                    clusterPO = clusterMapper.selectByUserId(parentUserId);
-//                }
-//                if (clusterPO == null) {
-//                    log.error("dispatchProject 项目 " + projectId + " 的创建用户 " + userId + " 没有分配集群!");
-//                    return;
-//                }
-//                clusterId = clusterPO.getId();
-//            } else {
-//                log.error("ProjectConsumer--dispatchProject 项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
-//                return;
-//            }
-//            PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
-//            // --------------------------------  判断项目是否已经在执行,如果执行则 continue --------------------------------
-//            if (StringUtil.isNotEmpty(stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey()))) {
+//    @Transactional
+//    public void taskTimeout() {
+//        //1 查询任务信息(需要过滤已经删除的项目,因为页面上删除项目不会删除任务)
+//        List<TaskPO> executingTaskList = taskMapper.selectByRunStateAndProjectIsNotDeleted(DictConstants.TASK_RUNNING);
+//        if (CollectionUtil.isEmpty(executingTaskList)) {
+//            return;
+//        }
+//        log.info("taskTimeout 正在运行的任务有:" + executingTaskList);
+//        for (TaskPO task : executingTaskList) {
+//            String userId = task.getCreateUserId();
+//            String projectId = task.getPId();
+//            String taskId = task.getId();
+//            PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaskId(userId, projectId, taskId);
+//            // 获取心跳时间
+//            String tickTime = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskTickKey());
+//            if (StringUtil.isEmpty(tickTime)) {
+//                log.error(redisPrefix.getTaskTickKey() + ",该 key 的心跳时间为空!");
 //                continue;
 //            }
-//            // -------------------------------- 项目没有执行说明等待中 --------------------------------
-//            if (isSystem) { // 系统管理员直接执行
-//                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
-//                return;
+//            long lastTickTime = Long.parseLong(tickTime);
+//            // 如果心跳超时则更改任务状态为 Aborted
+//            if (TimeUtil.getNow() - lastTickTime > Long.parseLong(kubernetesConfiguration.getPodTimeout())) {
+//                String podName = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskPodKey());
+//                taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
 //            }
-//            int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
-//            // 获取该用户正在运行的项目数量
-//            Set<String> keySetOfAllProjectOfCluster = RedisUtil.getKeySetByPrefix(stringRedisTemplate, redisPrefix.getClusterRunningPrefix());
-//            List<String> keyListOfRunningProject = null;
-//            // cluster:${clusterId}:running:${projectId}
-//            if (CollectionUtil.isNotEmpty(keySetOfAllProjectOfCluster)) {
-//                keyListOfRunningProject = projectUtil.getRunningProjectList(keySetOfAllProjectOfCluster);   // 筛选出运行中的项目信息 (key 为 cluster:${cluster}:running:${projectId})
-//                if (CollectionUtil.isNotEmpty(keyListOfRunningProject)) {
-//                    log.info("dispatchProject 运行中的项目的 key 有:" + keyListOfRunningProject);
-//                    long parallelismSum = 0;
-//                    for (String keyOfRunningProject : keyListOfRunningProject) {
-//                        parallelismSum += JsonUtil.jsonToBean(RedisUtil.getStringByKey(stringRedisTemplate, keyOfRunningProject), ProjectMessageDTO.class).getParallelism();
-//                    }
-//                    if (parallelismSum < simulationLicenseNumber) { // 已经运行的项目小于集群允许运行的项目则,运行新的项目
-//                        if (parallelismSum + parallelism < simulationLicenseNumber) {
-//                            run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
-//                            return;
-//                        }
-//                    }
-//                }
-//            }
-//            if ((CollectionUtil.isEmpty(keySetOfAllProjectOfCluster) || CollectionUtil.isEmpty(keyListOfRunningProject)) && parallelism < simulationLicenseNumber) {
-//                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
-//            }
-//        }
-//    }
-//
-//
-//    @SneakyThrows
-//    public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey) {
-//        ProjectMessageDTO projectMessageDTO;
-//        try {
-//            projectMessageDTO = JsonUtil.jsonToBean(RedisUtil.getStringByKey(stringRedisTemplate, projectWaitingKey), ProjectMessageDTO.class);
-//        } catch (Exception e) {
-//            log.info("run 等待执行的项目信息已经从 redis 删除。");
-//            return;
-//        }
-//        //1 获取所有节点的剩余可用并行度
-//        int restParallelism = projectUtil.getRestParallelism();
-//        if (restParallelism == 0) {
-//            log.info("run 集群中没有可用并行度,项目 " + projectId + " 继续等待。");
-//            return;
-//        }
-//        //2 判断剩余可用并行度是否大于项目并行度,否则继续等待
-//        if (restParallelism > 0L) {
-//            log.info("run 集群 " + clusterId + " 执行项目项目 " + projectId);
-//            projectMessageDTO.setCurrentParallelism(restParallelism);    // 设置实际的并行度
-//            projectConsumer.parseProject(projectMessageDTO, projectRunningKey);
 //        }
 //    }
-//
-//
-////    /**
-////     * 处理 pod 超时
-////     * 同时也可处理 pod 莫名关闭,因为关闭之后也会超时
-////     */
-////    @Scheduled(fixedDelay = 60 * 1000)
-////    @Transactional
-////    public void taskTimeout() {
-////        //1 查询任务信息(需要过滤已经删除的项目,因为页面上删除项目不会删除任务)
-////        List<TaskPO> executingTaskList = taskMapper.selectByRunStateAndProjectIsNotDeleted(DictConstants.TASK_RUNNING);
-////        if (CollectionUtil.isEmpty(executingTaskList)) {
-////            return;
-////        }
-////        log.info("taskTimeout 正在运行的任务有:" + executingTaskList);
-////        for (TaskPO task : executingTaskList) {
-////            String userId = task.getCreateUserId();
-////            String projectId = task.getPId();
-////            String taskId = task.getId();
-////            PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaskId(userId, projectId, taskId);
-////            // 获取心跳时间
-////            String tickTime = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskTickKey());
-////            if (StringUtil.isEmpty(tickTime)) {
-////                log.error(redisPrefix.getTaskTickKey() + ",该 key 的心跳时间为空!");
-////                continue;
-////            }
-////            long lastTickTime = Long.parseLong(tickTime);
-////            // 如果心跳超时则更改任务状态为 Aborted
-////            if (TimeUtil.getNow() - lastTickTime > Long.parseLong(kubernetesConfiguration.getPodTimeout())) {
-////                String podName = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskPodKey());
-////                taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
-////            }
-////        }
-////    }
-//
-//}
+
+}

+ 1 - 1
simulation-resource-video/src/main/java/com/css/simulation/resource/video/configuration/minio/MinioConfiguration.java → simulation-resource-video/src/main/java/com/css/simulation/resource/video/common/configuration/minio/MinioConfiguration.java

@@ -1,4 +1,4 @@
-package com.css.simulation.resource.video.configuration.minio;
+package com.css.simulation.resource.video.common.configuration.minio;
 
 import io.minio.MinioClient;
 import lombok.Data;

+ 1 - 1
simulation-resource-video/src/main/java/com/css/simulation/resource/video/util/MinioUtil.java → simulation-resource-video/src/main/java/com/css/simulation/resource/video/common/util/MinioUtil.java

@@ -1,4 +1,4 @@
-package com.css.simulation.resource.video.util;
+package com.css.simulation.resource.video.common.util;
 
 import io.minio.*;
 import io.minio.errors.*;

+ 7 - 7
simulation-resource-video/src/main/java/com/css/simulation/resource/video/service/VideoService.java

@@ -10,7 +10,7 @@ import com.css.simulation.resource.video.mapper.DictMapper;
 import com.css.simulation.resource.video.mapper.SimulationAutomaticProjectMapper;
 import com.css.simulation.resource.video.mapper.VehicleMapper;
 import com.css.simulation.resource.video.pojo.po.VehiclePO;
-import com.css.simulation.resource.video.util.MinioUtil;
+import com.css.simulation.resource.video.common.util.MinioUtil;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -37,9 +37,9 @@ import java.util.Iterator;
 @Slf4j
 public class VideoService {
     @Resource
-    VehicleMapper vehicleMapper;
+    private VehicleMapper vehicleMapper;
     @Resource
-    ConfigMapper configMapper;
+    private ConfigMapper configMapper;
     @Resource
     SimulationAutomaticProjectMapper simulationAutomaticProjectMapper;
     @Resource
@@ -150,7 +150,7 @@ public class VideoService {
         String execute = LinuxUtil.execute(command);    // 执行命令后生成 xosc 到目录下
         String oldXoscPath = csvDirectoryPath + oldXoscRelativePath;  // 生成文件的路径是固定的
         //2 调用修改xosc
-        return modifyXosc(oldXoscPath, xodrPath, osgbPath, projectId, projectType);  // 返回最新的xosc文件全路径
+        return modifyXosc(oldXoscPath, xodrPath, osgbPath, projectId, projectType);  // 返回最新的 xosc 文件全路径
     }
 
 
@@ -312,12 +312,12 @@ public class VideoService {
         } else if (projectType.equals("2")) {
             vehicle = simulationAutomaticProjectMapper.vehicleByZdId(projectId);
         }
-        log.info("VideoService--vehicleById 车辆配置 Id 为:" + vehicle);
+        log.info("vehicleById() 车辆配置 Id 为:" + vehicle);
         String vehicleId = configMapper.getVehicleId(vehicle);
 
-        log.info("VideoService--vehicleById 车辆 Id 为:" + vehicleId);
+        log.info("vehicleById() 车辆 Id 为:" + vehicleId);
         VehiclePO vehicleInfo = vehicleMapper.getVehicleInfo(vehicleId);
-        log.info("VideoService--vehicleById 车辆信息为:" + vehicleInfo);
+        log.info("vehicleById() 车辆信息为:" + vehicleInfo);
 
         //自车模型
     /*    String car = vehicleInfo.getVehicleType().substring(0, vehicleInfo.getVehicleType().indexOf(","));