root 2 lat temu
rodzic
commit
7a4326a4af

+ 9 - 14
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/scheduler/ClusterScheduler.java → simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/configuration/init/CustomApplicationRunner.java

@@ -1,14 +1,14 @@
-package com.css.simulation.resource.scheduler.web.scheduler;
+package com.css.simulation.resource.scheduler.common.configuration.init;
 
 import api.common.util.CollectionUtil;
 import com.css.simulation.resource.scheduler.common.configuration.kubernetes.KubernetesConfiguration;
 import com.css.simulation.resource.scheduler.common.util.KubernetesUtil;
 import com.css.simulation.resource.scheduler.service.domain.KubernetesNodeTO;
 import io.kubernetes.client.openapi.ApiClient;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
 import org.springframework.data.redis.core.StringRedisTemplate;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
@@ -16,9 +16,7 @@ import java.util.List;
 
 @Component
 @Slf4j
-public class ClusterScheduler {
-
-
+public class CustomApplicationRunner implements ApplicationRunner {
     @Resource
     private KubernetesConfiguration kubernetesConfiguration;
     @Resource
@@ -26,13 +24,10 @@ public class ClusterScheduler {
     @Resource
     private StringRedisTemplate stringRedisTemplate;
 
-    /**
-     * 闲时重置所有并行度
-     */
-    @Scheduled(fixedDelay = 2 * 60 * 1000)
-    @SneakyThrows
-    public void initNodeParallelism() {
-        List<KubernetesNodeTO> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
+
+    @Override
+    public void run(ApplicationArguments args) {
+        List<KubernetesNodeTO> initialNodeList = kubernetesConfiguration.getNodeList();
         List<String> podNameList = KubernetesUtil.getPod(apiClient, kubernetesConfiguration.getNamespace());
         if (CollectionUtil.isEmpty(podNameList)) {
             for (KubernetesNodeTO kubernetesNodeTO : initialNodeList) {
@@ -42,4 +37,4 @@ public class ClusterScheduler {
         }
 
     }
-}
+}

+ 0 - 16
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/configuration/init/MyApplicationRunner.java

@@ -1,16 +0,0 @@
-package com.css.simulation.resource.scheduler.common.configuration.init;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-public class MyApplicationRunner implements ApplicationRunner {
-
-    @Override
-    public void run(ApplicationArguments args) {
-        log.info("应用启动了。");
-    }
-}

+ 46 - 42
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/util/KubernetesUtil.java

@@ -97,50 +97,54 @@ public class KubernetesUtil {
         return v1NamespaceList.getItems().stream().map(ns -> Objects.requireNonNull(ns.getMetadata()).getName()).collect(Collectors.toList());
     }
 
-    public static List<String> getPod(ApiClient apiClient, String namespaceName) throws ApiException {
+    public static List<String> getPod(ApiClient apiClient, String namespaceName) {
 
-        CoreV1Api api = new CoreV1Api(apiClient);
-        V1PodList list;
-        if (namespaceName == null || "".equals(namespaceName) || "default".equals(namespaceName)) {
-            list = api.listNamespacedPod(
-                    "default",
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null);
-        } else if ("all".equals(namespaceName)) {
-            list = api.listPodForAllNamespaces(
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null);
-        } else {
-            list = api.listNamespacedPod(
-                    namespaceName,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null);
+        try {
+            CoreV1Api api = new CoreV1Api(apiClient);
+            V1PodList list;
+            if (namespaceName == null || "".equals(namespaceName) || "default".equals(namespaceName)) {
+                list = api.listNamespacedPod(
+                        "default",
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+            } else if ("all".equals(namespaceName)) {
+                list = api.listPodForAllNamespaces(
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+            } else {
+                list = api.listNamespacedPod(
+                        namespaceName,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+            }
+            return list.getItems().stream().map(pod -> Objects.requireNonNull(pod.getMetadata()).getName()).collect(Collectors.toList());
+        } catch (ApiException e) {
+            throw new RuntimeException(e);
         }
-        return list.getItems().stream().map(pod -> Objects.requireNonNull(pod.getMetadata()).getName()).collect(Collectors.toList());
     }
 
     /**

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/manager/TaskManager.java

@@ -482,7 +482,7 @@ public class TaskManager {
     }
 
     public void taskTick(String taskId) {
-        log.info("taskTick() 任务 " + taskId + " 心跳。");
+        log.info("收到任务 " + taskId + " 的心跳。");
         TaskPO taskPO = taskMapper.selectById(taskId);
         String projectId = taskPO.getPId();
         String userId = taskPO.getCreateUserId();

+ 1 - 72
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/scheduler/ProjectScheduler.java

@@ -1,15 +1,11 @@
 package com.css.simulation.resource.scheduler.web.scheduler;
 
-import api.common.pojo.dto.ProjectMessageDTO;
 import api.common.util.CollectionUtil;
-import api.common.util.JsonUtil;
 import com.css.simulation.resource.scheduler.common.util.ProjectUtil;
 import com.css.simulation.resource.scheduler.common.util.RedisUtil;
 import com.css.simulation.resource.scheduler.web.consumer.ProjectConsumer;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
@@ -20,14 +16,6 @@ import java.util.List;
 @Component
 @Slf4j
 public class ProjectScheduler {
-
-    @Value("${scheduler.host.hostname}")
-    private String hostname;
-    @Value("${scheduler.host.username}")
-    private String username;
-    @Value("${scheduler.host.password}")
-    private String password;
-    // -------------------------------- Comment --------------------------------
     @Resource
     private StringRedisTemplate stringRedisTemplate;
     @Resource
@@ -35,15 +23,14 @@ public class ProjectScheduler {
     @Resource
     private ProjectUtil projectUtil;
 
-
     /**
      * 调度项目启动
      */
     @Scheduled(fixedDelay = 60 * 1000)
     public void dispatchProject() {
-        log.info("//1 查询等待中的项目任务。");
         List<String> projectMessageKeys = projectUtil.getWaitingProjectMessageKeys();
         if (!CollectionUtil.isEmpty(projectMessageKeys)) {
+            log.info("尝试启动等待中的项目:" + projectMessageKeys);
             for (String projectMessageKey : projectMessageKeys) {
                 final String projectMessage = RedisUtil.getStringByKey(stringRedisTemplate, projectMessageKey);
                 ConsumerRecord<String, String> projectRecord = new ConsumerRecord<>("", 0, 0L, null, projectMessage);
@@ -52,62 +39,4 @@ public class ProjectScheduler {
         }
     }
 
-
-    @SneakyThrows
-    public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey) {
-        ProjectMessageDTO projectMessageDTO;
-        try {
-            projectMessageDTO = JsonUtil.jsonToBean(RedisUtil.getStringByKey(stringRedisTemplate, projectWaitingKey), ProjectMessageDTO.class);
-        } catch (Exception e) {
-            log.info("run 等待执行的项目信息已经从 redis 删除。");
-            return;
-        }
-        //1 获取所有节点的剩余可用并行度
-        int restParallelism = projectUtil.getRestParallelism();
-        if (restParallelism == 0) {
-            log.info("run 集群中没有可用并行度,项目 " + projectId + " 继续等待。");
-            return;
-        }
-        //2 判断剩余可用并行度是否大于项目并行度,否则继续等待
-        if (restParallelism > 0L) {
-            log.info("run 集群 " + clusterId + " 执行项目项目 " + projectId);
-            projectMessageDTO.setCurrentParallelism(restParallelism);    // 设置实际的并行度
-            projectConsumer.parseProject(projectMessageDTO, projectRunningKey);
-        }
-    }
-
-
-//    /**
-//     * 处理 pod 超时
-//     * 同时也可处理 pod 莫名关闭,因为关闭之后也会超时
-//     */
-//    @Scheduled(fixedDelay = 60 * 1000)
-//    @Transactional
-//    public void taskTimeout() {
-//        //1 查询任务信息(需要过滤已经删除的项目,因为页面上删除项目不会删除任务)
-//        List<TaskPO> executingTaskList = taskMapper.selectByRunStateAndProjectIsNotDeleted(DictConstants.TASK_RUNNING);
-//        if (CollectionUtil.isEmpty(executingTaskList)) {
-//            return;
-//        }
-//        log.info("taskTimeout 正在运行的任务有:" + executingTaskList);
-//        for (TaskPO task : executingTaskList) {
-//            String userId = task.getCreateUserId();
-//            String projectId = task.getPId();
-//            String taskId = task.getId();
-//            PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaskId(userId, projectId, taskId);
-//            // 获取心跳时间
-//            String tickTime = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskTickKey());
-//            if (StringUtil.isEmpty(tickTime)) {
-//                log.error(redisPrefix.getTaskTickKey() + ",该 key 的心跳时间为空!");
-//                continue;
-//            }
-//            long lastTickTime = Long.parseLong(tickTime);
-//            // 如果心跳超时则更改任务状态为 Aborted
-//            if (TimeUtil.getNow() - lastTickTime > Long.parseLong(kubernetesConfiguration.getPodTimeout())) {
-//                String podName = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskPodKey());
-//                taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
-//            }
-//        }
-//    }
-
 }