martin před 3 roky
rodič
revize
c2545a95f7

+ 4 - 0
api-common/src/main/java/api/common/util/CollectionUtil.java

@@ -33,6 +33,10 @@ public class CollectionUtil {
         return collection == null || collection.isEmpty();
     }
 
+    public static boolean isEmpty(ArrayList<?> arrayList) {
+        return arrayList == null || arrayList.isEmpty();
+    }
+
     public static boolean isEmpty(Map<?, ?> map) {
         return map == null || map.isEmpty();
     }

+ 28 - 29
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -1,29 +1,28 @@
-//package com.css.simulation.resource.scheduler.manager;
-//
-//import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-//import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
-//import org.apache.ibatis.session.ExecutorType;
-//import org.apache.ibatis.session.SqlSession;
-//import org.apache.ibatis.session.SqlSessionFactory;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.stereotype.Component;
-//
-//import java.util.List;
-//
-//@Component
-//public class TaskManager {
-//
-//    @Autowired
-//    private SqlSessionFactory sqlSessionFactory;
-//
-//    public void batchUpdateByScoreResult(List<TaskPO> taskList) {
-//
-//        try(SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH,false)){
-//            TaskMapper taskMapper = sqlSession.getMapper(TaskMapper.class);
-//            for (TaskPO taskPO : taskList) {
-//                taskMapper.updateByScoreResult(taskPO);
-//            }
-//            sqlSession.commit();
-//        }
-//    }
-//}
+package com.css.simulation.resource.scheduler.manager;
+
+import api.common.pojo.constants.DictConstants;
+import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+import org.apache.ibatis.session.ExecutorType;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.sql.Timestamp;
+
+@Component
+public class TaskManager {
+
+    @Autowired
+    private SqlSessionFactory sqlSessionFactory;
+
+    public void updateFailStateWithStopTime(String taskId, String runState, Timestamp runStopTime) {
+
+        try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
+            TaskMapper taskMapper = sqlSession.getMapper(TaskMapper.class);
+            taskMapper.updateFailStateWithStopTime(taskId, runState, runStopTime);
+            taskMapper.updateProjectStateByTaskId(DictConstants.PROJECT_TERMINATED, taskId);
+            sqlSession.commit();
+        }
+    }
+}

+ 8 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskMapper.java

@@ -6,6 +6,7 @@ import org.apache.ibatis.annotations.*;
 import org.apache.ibatis.type.JdbcType;
 
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -38,6 +39,12 @@ public interface TaskMapper {
     List<TaskPO> selectTaskListByProjectId(@Param("projectId") String projectId);
 
 
+    @Update("update simulation_manual_project\n" +
+            "set now_run_state = #{state}\n" +
+            "where id = (select p_id from simulation_manual_project_task where id = #{taskId})")
+    void updateProjectStateByTaskId( @Param("state") String state, @Param("taskId") String taskId);
+
+
     @Update("update simulation_manual_project_task\n" +
             "set run_state = #{runState},run_start_time = #{runStartTime}\n" +
             "where id = #{id}")
@@ -86,7 +93,7 @@ public interface TaskMapper {
             "where smpt.is_deleted = '0'\n" +
             "  and smp.is_deleted = '0'\n" +
             "  and smpt.run_state in ('Running')")
-    List<TaskPO> selectExecuting();
+    ArrayList<TaskPO> selectExecuting();
 
     @Select("select id\n" +
             "from simulation_manual_project_task\n" +

+ 41 - 37
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -1,14 +1,14 @@
 package com.css.simulation.resource.scheduler.scheduler;
 
 import api.common.pojo.constants.DictConstants;
-import api.common.util.*;
+import api.common.util.LinuxUtil;
+import api.common.util.StringUtil;
+import api.common.util.TimeUtil;
 import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.service.TaskService;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.sshd.client.SshClient;
-import org.apache.sshd.client.session.ClientSession;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
@@ -16,6 +16,7 @@ import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 @Component
@@ -40,45 +41,45 @@ public class TickScheduler {
     @Scheduled(fixedDelay = 60 * 1000)
     public void tick() {
 
-        List<TaskPO> executingTaskList = taskMapper.selectExecuting();
-        if (CollectionUtil.isEmpty(executingTaskList)) {
-            return;
-        }
-//        log.info("------- TickScheduler 查询出所有执行中的任务('Running'):" + executingTaskList);
-        //2 根据 key 查出任务的心跳时间
-        executingTaskList.forEach(task -> {
-            String taskId = task.getId();
-            String projectId = task.getPId();
-            try {
-                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId);
+        ArrayList<TaskPO> executingTaskList = taskMapper.selectExecuting();
+        if (executingTaskList != null && executingTaskList.size() > 0) {
+            //        log.info("------- TickScheduler 查询出所有执行中的任务('Running'):" + executingTaskList);
+            //2 根据 key 查出任务的心跳时间
+            executingTaskList.forEach(task -> {
+                String taskId = task.getId();
+                String projectId = task.getPId();
+                try {
+                    String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId);
 //                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
-                assert s != null;
-                long tickTime = Long.parseLong(s);
-                long timeout = 2 * 60 * 1000L;
-                long now = TimeUtil.getNow();
-                long difference = now - tickTime;
+                    assert s != null;
+                    long tickTime = Long.parseLong(s);
+                    long timeout = 2 * 60 * 1000L;
+                    long now = TimeUtil.getNow();
+                    long difference = now - tickTime;
 //                log.info("------- TickScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + difference);
-                if (difference > timeout) {
-                    log.info("------- TickScheduler 任务" + taskId + "已超时,状态修改为:" + DictConstants.TASK_ABORTED);
-                    String podName = redisTemplate.opsForValue().get("podName:" + taskId);
-                    if (podName != null) {
-                        taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
+                    if (difference > timeout) {
+                        log.info("------- TickScheduler 任务" + taskId + "已超时,状态修改为:" + DictConstants.TASK_ABORTED);
+                        String podName = redisTemplate.opsForValue().get("podName:" + taskId);
+                        if (podName != null) {
+                            taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
+                        }
                     }
+                } catch (Exception e) {
+                    throw new RuntimeException(e.getMessage());
                 }
-            } catch (Exception e) {
-                throw new RuntimeException(e.getMessage());
-            }
-        });
+            });
+        }
+
     }
 
 
     /**
      * 检查如果有 job 在运行但是 pod 全部关闭的情况,此时需要重启一下 job
      */
-    @Scheduled(fixedDelay = 60 * 1000)
+    @Scheduled(fixedDelay = 30 * 1000)
     public void checkProject() throws IOException {
-        SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, "182.92.203.182", "root", "CICV2022test");
+//        SshClient client = SshUtil.getClient();
+//        ClientSession session = SshUtil.getSession(client, "182.92.203.182", "root", "CICV2022test");
 
         //1 查询出正在运行中的 project
         List<String> projectIdList = projectMapper.selectIdByState("20");
@@ -91,7 +92,9 @@ public class TickScheduler {
 
             try {
                 String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
-                String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+//                String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+                String podList = LinuxUtil.execute("kubectl get pod | grep project-" + projectId);
+                log.info("TickScheduler-------checkProject 项目 " + projectId + " 正在运行的 pod 为" + podList);
                 int taskNumber = StringUtil.countSubString(podList, "project");
                 if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
                     redisTemplate.opsForValue().set(key, nowString);
@@ -102,12 +105,13 @@ public class TickScheduler {
                     //3 如果 pod 为空,则重启 job
                     long lastNow = Long.parseLong(lastNowString);
                     long now = Long.parseLong(nowString);
-                    if (now - lastNow > 2L * 60 * 1000) {
+                    if (now - lastNow > (long) 60 * 1000) {
+//                        SshUtil.execute(session,"kubectl delete job project-" + projectId);
                         LinuxUtil.execute("kubectl delete job project-" + projectId);
-                        Thread.sleep(30000);
+                        Thread.sleep(15000);
                         while (true) {
                             log.info("TickScheduler-------checkProject 准备重启项目 " + projectId);
-                            String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
+                            String podList2 = LinuxUtil.execute("kubectl get pod | grep project-" + projectId);
                             log.info("TickScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:" + podList2);
                             int taskNumber2 = StringUtil.countSubString(podList2, "project");
                             if (taskNumber2 == 0) {
@@ -124,8 +128,8 @@ public class TickScheduler {
             }
         });
 
-        session.close();
-        client.stop();
+//        session.close();
+//        client.stop();
 
     }
 }

+ 4 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -3,6 +3,7 @@ package com.css.simulation.resource.scheduler.service;
 import api.common.pojo.constants.DictConstants;
 import api.common.util.*;
 import com.css.simulation.resource.scheduler.manager.TaskIndexManager;
+import com.css.simulation.resource.scheduler.manager.TaskManager;
 import com.css.simulation.resource.scheduler.mapper.IndexTemplateMapper;
 import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskIndexMapper;
@@ -59,6 +60,8 @@ public class TaskService {
     @Autowired
     TaskIndexManager taskIndexManager;
     @Autowired
+    TaskManager taskManager;
+    @Autowired
     TaskIndexMapper taskIndexMapper;
     @Autowired
     IndexTemplateMapper indexTemplateMapper;
@@ -110,6 +113,7 @@ public class TaskService {
             taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
         } else if ("Aborted".equals(state) || "Terminated".equals(state)) {
             LinuxUtil.execute("kubectl delete pod " + podName);
+//            taskManager.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
             taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
             redisTemplate.delete("podName:" + taskId);
             return;