martin 2 lat temu
rodzic
commit
111c17a84f

+ 4 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -48,7 +48,7 @@ public class ProjectConsumer {
     // -------------------------------- Comment --------------------------------
 
     @Resource
-    StringRedisTemplate redisTemplate;
+    StringRedisTemplate stringRedisTemplate;
     @Resource
     ManualProjectMapper manualProjectMapper;
     @Resource
@@ -125,7 +125,7 @@ public class ProjectConsumer {
         int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
         // 获取该集群中正在运行的项目,如果没有则立即执行
         PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
-        Set<String> clusterRunningKeySet = redisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
+        Set<String> clusterRunningKeySet = stringRedisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
         List<String> runningProjectSet;
         if (CollectionUtil.isEmpty(clusterRunningKeySet)) {
             run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
@@ -139,7 +139,7 @@ public class ProjectConsumer {
         // 计算正在运行的项目的并行度总和
         long parallelismSum = 0;
         for (String projectKey : runningProjectSet) {
-            String projectJsonTemp = redisTemplate.opsForValue().get(projectKey);
+            String projectJsonTemp = stringRedisTemplate.opsForValue().get(projectKey);
             ProjectMessageDTO projectMessageTemp = JsonUtil.jsonToBean(projectJsonTemp, ProjectMessageDTO.class);
             parallelismSum += projectMessageTemp.getParallelism();
         }
@@ -170,7 +170,7 @@ public class ProjectConsumer {
 
     public void wait(String clusterId, String projectId, String projectWaitingKey, String projectJson) {
         log.info("ProjectConsumer--wait 集群 " + clusterId + " 将项目 " + projectId + " 放入等待队列!");
-        redisTemplate.opsForValue().set(projectWaitingKey, projectJson);
+        stringRedisTemplate.opsForValue().set(projectWaitingKey, projectJson);
     }
 
 

+ 3 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/TaskController.java

@@ -9,12 +9,14 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.annotation.Resource;
+
 @RefreshScope
 @RestController
 @RequestMapping("/task")
 public class TaskController {
 
-    @Autowired
+    @Resource
     TaskService taskService;
 
     // -------------------------------- Comment --------------------------------

+ 2 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskIndexManager.java

@@ -5,15 +5,15 @@ import com.css.simulation.resource.scheduler.pojo.po.LeafIndexPO;
 import org.apache.ibatis.session.ExecutorType;
 import org.apache.ibatis.session.SqlSession;
 import org.apache.ibatis.session.SqlSessionFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.util.List;
 
 @Component
 public class TaskIndexManager {
 
-    @Autowired
+    @Resource
     private SqlSessionFactory sqlSessionFactory;
 
     public void batchInsertLeafIndex(List<LeafIndexPO> leafTaskIndexList) {

+ 16 - 78
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -21,7 +21,6 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
-import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
@@ -44,7 +43,7 @@ public class ProjectScheduler {
     String jobYaml;
     // -------------------------------- Comment --------------------------------
     @Resource
-    StringRedisTemplate redisTemplate;
+    StringRedisTemplate stringRedisTemplate;
     @Resource
     TaskService taskService;
     @Resource
@@ -58,12 +57,8 @@ public class ProjectScheduler {
     @Resource
     ApiClient apiClient;
     @Resource
-    KafkaTemplate<String, String> kafkaTemplate;
-    @Resource
     ProjectConsumer projectConsumer;
     @Resource
-    UserMapper userMapper;
-    @Resource
     ProjectUtil projectUtil;
 
 
@@ -95,13 +90,13 @@ public class ProjectScheduler {
             String clusterId = clusterPO.getId();
             PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
             // --------------------------------  判断项目是否已经在执行,如果执行则 continue --------------------------------
-            if (StringUtil.isNotEmpty(redisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey()))) {
+            if (StringUtil.isNotEmpty(stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey()))) {
                 continue;
             }
             // -------------------------------- 项目没有执行说明等待中 --------------------------------
             int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
             // 获取该用户正在运行的项目数量
-            Set<String> clusterRunningKeySet = redisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
+            Set<String> clusterRunningKeySet = stringRedisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
             List<String> runningProjectSet = null;
             // cluster:${clusterId}:running:${projectId}
             if (CollectionUtil.isNotEmpty(clusterRunningKeySet)) {
@@ -110,7 +105,7 @@ public class ProjectScheduler {
                     log.info("ProjectScheduler--dispatchProject 运行中的项目的 key 有:" + runningProjectSet);
                     long parallelismSum = 0;
                     for (String runningProjectKey : runningProjectSet) {
-                        parallelismSum += JsonUtil.jsonToBean(redisTemplate.opsForValue().get(runningProjectKey), ProjectMessageDTO.class).getParallelism();
+                        parallelismSum += JsonUtil.jsonToBean(stringRedisTemplate.opsForValue().get(runningProjectKey), ProjectMessageDTO.class).getParallelism();
                     }
                     if (parallelismSum < simulationLicenseNumber) {
                         if (parallelismSum + parallelism < simulationLicenseNumber) {
@@ -128,7 +123,7 @@ public class ProjectScheduler {
 
 
     public void run(String clusterId, String projectId, String projectType, String projectWaitingKey, String projectRunningKey, long parallelism) {
-        String projectJson = redisTemplate.opsForValue().get(projectWaitingKey);
+        String projectJson = stringRedisTemplate.opsForValue().get(projectWaitingKey);
         if (StringUtil.isEmpty(projectJson)) {
             log.error("ProjectScheduler--run 项目 " + projectId + " 的开始消息查询失败,key 为:" + projectWaitingKey);
             return;
@@ -165,7 +160,7 @@ public class ProjectScheduler {
                 String taskId = task.getId();
                 PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaksId(userId, projectId, taskId);
                 // 获取心跳时间
-                String tickTime = redisTemplate.opsForValue().get(redisPrefix.getTaskTickKey());
+                String tickTime = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskTickKey());
                 if (StringUtil.isEmpty(tickTime)) {
                     log.error(redisPrefix.getTaskTickKey() + ",该 key 的心跳时间为空!");
                     continue;
@@ -173,7 +168,7 @@ public class ProjectScheduler {
                 long lastTickTime = Long.parseLong(tickTime);
                 // 如果心跳超时则更改任务状态为 Aborted
                 if (TimeUtil.getNow() - lastTickTime > timeout) {
-                    String podName = redisTemplate.opsForValue().get(redisPrefix.getTaskPodKey());
+                    String podName = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskPodKey());
                     taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
                 }
             }
@@ -194,21 +189,21 @@ public class ProjectScheduler {
             String projectId = project.getId();
             String userId = project.getCreateUserId();
             PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectId(userId, projectId);
-            String lastNowString = redisTemplate.opsForValue().get(redisPrefix.getProjectCheckKey());
-            // 获取正在运行的 pod 列表
-            List<String> podList = KubernetesUtil.getPod(apiClient, "");
-            int taskNumber = podList.size();
+            String lastNowString = stringRedisTemplate.opsForValue().get(redisPrefix.getProjectCheckKey());
+            // 获取该项目的 job 中正在运行的 pod 数量
+            List<String> allPodList = KubernetesUtil.getPod(apiClient, "");
+            long taskNumber = allPodList.stream().filter(podName -> podName.contains(projectId)).count();
             // 如果没有检查过且 pod 列表为空,则正式开始检查,设置第一次检查时间
-            if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
+            if (StringUtil.isEmpty(lastNowString) && taskNumber == 0L) {
                 log.info("ProjectScheduler--projectCheck 开始检查项目 " + projectId);
-                redisTemplate.opsForValue().set(redisPrefix.getProjectCheckKey(), TimeUtil.getNowString());
+                stringRedisTemplate.opsForValue().set(redisPrefix.getProjectCheckKey(), TimeUtil.getNowString());
                 return;
             }
-            log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
+            log.info("ProjectScheduler--projectCheck kubernetes 的命名空间 default 中正在运行的 pod 有:" + allPodList + ",其中项目 " + projectId + " 的任务个数为:" + taskNumber);
             //  如果两次检查时间超过了 2 分钟,且仍然没有 pod 执行,则准备重启
-            if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0 && Long.parseLong(TimeUtil.getNowString()) - Long.parseLong(lastNowString) > (long) 120 * 1000) {
+            if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0L && Long.parseLong(TimeUtil.getNowString()) - Long.parseLong(lastNowString) > (long) 120 * 1000) {
                 // 删除检查
-                redisTemplate.delete(redisPrefix.getProjectCheckKey());
+                stringRedisTemplate.delete(redisPrefix.getProjectCheckKey());
                 try {
                     // 删除 job
                     KubernetesUtil.deleteJob(apiClient, "default", "project-" + projectId);
@@ -220,61 +215,4 @@ public class ProjectScheduler {
             }
         }
     }
-
-
-//    /**
-//     * 解决 pod 莫名全部关闭但是 job 还在的问题
-//     * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
-//     */
-//    @Scheduled(fixedDelay = 30 * 1000)
-//    @SneakyThrows
-//    public void projectCheck() {
-//        SshClient client = SshUtil.getClient();
-//        ClientSession session = SshUtil.getSession(client, hostname, username, password);
-//
-//        //1 查询出正在运行中的 project
-//        List<ProjectPO> projectIdList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
-//        log.info("ProjectScheduler--projectCheck 运行中的项目有:" + projectIdList);
-//        //2 根据 projectId 获取 pod
-//        for (ProjectPO project : projectIdList) {
-//            String projectId = project.getId();
-//            String userId = project.getCreateUserId();
-//            PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectId(userId, projectId);
-//            String checkKey = redisPrefix.getProjectCheckKey();
-//            String lastNowString = redisTemplate.opsForValue().get(checkKey);
-//            String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-//            log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 正在运行的 pod 为:\n" + podList);
-//            int taskNumber = StringUtil.countSubString(podList, "project");
-//            if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {  // 为空代表第一次,先设置时间
-//                redisTemplate.opsForValue().set(checkKey, TimeUtil.getNowString());
-//            }
-//            if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0) {  // 非空则开始检查
-//                // 判断两次是否超过2分钟
-//                //3 如果 pod 为空,则重启 job
-//                long lastNow = Long.parseLong(lastNowString);
-//                long now = Long.parseLong(TimeUtil.getNowString());
-//                if (now - lastNow > (long) 120 * 1000) {
-//                    redisTemplate.opsForValue().set(checkKey, TimeUtil.getNowString());
-//                    SshUtil.execute(session, "kubectl delete job project-" + projectId);
-//                    Thread.sleep(15000);
-//                    while (true) {
-//                        log.info("ProjectScheduler--projectCheck 准备重启项目 " + projectId);
-//                        String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-//                        log.info("ProjectScheduler--projectCheck 项目 " + projectId + " 剩余的 pod 信息为:\n" + podList2);
-//                        int taskNumber2 = StringUtil.countSubString(podList2, "project");
-//                        if (taskNumber2 == 0) {
-//                            break;
-//                        }
-//                    }
-//                    Thread.sleep(15000);
-//                    log.info("ProjectScheduler--projectCheck 重新执行项目" + projectId);
-//                    String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
-//                    SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
-//                }
-//            }
-//        }
-//        session.close();
-//        client.stop();
-//
-//    }
 }

+ 3 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/AlgorithmService.java

@@ -10,14 +10,15 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
 import java.io.InputStream;
 
 @Service
 public class AlgorithmService {
 
-    @Autowired
+    @Resource
     AlgorithmMapper algorithmMapper;
-    @Autowired
+    @Resource
     MinioClient minioClient;
     @Value("${minio.bucket-name}")
     String bucketName;