martin 3 yıl önce
ebeveyn
işleme
1692e1299a

+ 0 - 3
api-common/src/main/java/api/common/util/SshUtil.java

@@ -76,7 +76,6 @@ public class SshUtil {
      */
     public static String execute(ClientSession session, String command) throws IOException {
         String result;
-
         ChannelExec execChannel = session.createExecChannel(command);
         // 创建输出流
         ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(2048);
@@ -87,8 +86,6 @@ public class SshUtil {
         // 结果写入
         result = byteArrayOutputStream.toString();
         byteArrayOutputStream.close();
-
-
         return result;
     }
 

+ 0 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -86,7 +86,6 @@ public class TickScheduler {
                 String key = manualProjectTopic + ":" + projectId + ":check";
                 String lastNowString = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":check");
                 String podList = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
-//                String podList = LinuxUtil.execute("kubectl get pod | grep project-" + projectId);
                 log.info("TickScheduler-------checkProject 项目 " + projectId + " 正在运行的 pod 为" + podList);
                 int taskNumber = StringUtil.countSubString(podList, "project");
                 if (StringUtil.isEmpty(lastNowString) && taskNumber == 0) {
@@ -102,11 +101,9 @@ public class TickScheduler {
                     if (now - lastNow > (long) 120 * 1000) {
                         redisTemplate.opsForValue().set(key, TimeUtil.getNowString());
                         SshUtil.execute(session, "kubectl delete job project-" + projectId);
-//                        LinuxUtil.execute("kubectl delete job project-" + projectId);
                         Thread.sleep(15000);
                         while (true) {
                             log.info("TickScheduler-------checkProject 准备重启项目 " + projectId);
-//                            String podList2 = LinuxUtil.execute("kubectl get pod | grep project-" + projectId);
                             String podList2 = SshUtil.execute(session, "kubectl get pod | grep project-" + projectId);
                             log.info("TickScheduler-------checkProject 项目 " + projectId + " 剩余的 pod 信息为:" + podList2);
                             int taskNumber2 = StringUtil.countSubString(podList2, "project");
@@ -118,7 +115,6 @@ public class TickScheduler {
                         log.info("TickScheduler-------checkProject 重新执行项目" + projectId);
                         String jobTemplateYamlPathTarget = jobYaml + "project-" + projectId + ".yaml";
                         SshUtil.execute(session, "kubectl apply -f " + jobTemplateYamlPathTarget);
-//                        LinuxUtil.execute("kubectl apply -f " + jobTemplateYamlPathTarget);
                     }
                 }
             } catch (IOException | InterruptedException e) {

+ 22 - 20
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -109,13 +109,16 @@ public class TaskService {
     public void taskState(String taskId, String state, String podName) {
 
         redisTemplate.opsForValue().set("podName:" + taskId, podName);
+        String podDeleteCommand = "kubectl delete pod " + podName;
+        SshClient client = SshUtil.getClient();
+        ClientSession session = SshUtil.getSession(client, "182.92.203.182", "root", "CICV2022test");
         if ("Running".equals(state)) {
             log.info("TaskService--state 修改任务 " + taskId + "的状态为 Running,pod 名称为:" + podName);
             taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
         } else if ("Aborted".equals(state)) {
-            log.info("TaskService--state 修改任务 " + taskId + "的状态为 Aborted,pod 名称为:" + podName);
-            LinuxUtil.execute("kubectl delete pod " + podName);
-//            taskManager.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
+            log.info("TaskService--state 修改任务 " + taskId + "的状态为 Aborted,pod 名称为:" + podName + ",并执行删除 pod 命令:" + podDeleteCommand);
+            SshUtil.execute(session, podDeleteCommand);
+//            taskManager.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql()); // 如果任务 abort 代表项目失败
             taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
             redisTemplate.delete("podName:" + taskId);
             //result-path-minio: /project/manual-project/
@@ -137,28 +140,23 @@ public class TaskService {
             String errorMessageString = errorMessage.get();
             return;
         } else if ("Terminated".equals(state)) {
-            String command = "kubectl delete pod " + podName;
             log.info("TaskService--state 修改任务 " + taskId + "的状态为 Terminated,pod 名称为:" + podName
-                    + ",并执行命令:" + command);
-            String resultTerminated = LinuxUtil.execute1(command);
+                    + ",并执行删除 pod 命令:" + podDeleteCommand);
+            SshUtil.execute(session, podDeleteCommand);
             taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
             redisTemplate.delete("podName:" + taskId);
         } else if ("PendingAnalysis".equals(state)) {
-            LinuxUtil.execute("kubectl delete pod " + podName);
+            log.info("TaskService--state 修改任务 " + taskId + "的状态为 PendingAnalysis,pod 名称为:" + podName
+                    + ",并执行删除 pod 命令:" + podDeleteCommand);
+            SshUtil.execute(session, podDeleteCommand);
             taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
             redisTemplate.delete("podName:" + taskId);
         } else {
+            log.error("TaskService--state 出现了未知状态:" + state);
             taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
             redisTemplate.delete("podName:" + taskId);
         }
         ProjectPO projectPO = projectMapper.selectById(taskId);
-//        while (StringUtil.isNotEmpty(taskId) && projectPO == null) {
-//            Thread.sleep(10000);
-//            projectPO = projectMapper.selectById(taskId);
-//        }
-        if (projectPO == null) {
-            return;
-        }
         String projectId = projectPO.getId();
         Set<String> keys = redisTemplate.keys("manualProject:" + projectId + "*");
         assert keys != null;
@@ -170,10 +168,13 @@ public class TaskService {
         projectMapper.updateTaskCompleted(projectId, endTaskNum);
         log.info("TaskService--state 项目 " + projectId + " 完成进度为:" + endTaskNum + "/" + taskNum);
         if (taskNum != endTaskNum) {  // 已结束任务数等于所有任务数量,才会准备打分;否则退出。
+            session.close();
+            client.stop();
             return;
         }
         projectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());   // 修改该 project 的状态为已完成
-        LinuxUtil.execute("kubectl delete job project-" + projectId);
+        log.info("结束项目的 job");
+        SshUtil.execute(session, "kubectl delete job project-" + projectId);
         SshClient clientKafka = SshUtil.getClient();
         ClientSession sessionKafka = SshUtil.getSession(clientKafka, hostnameKafka, usernameKafka, passwordKafka);
         String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand, "topicName", projectId);
@@ -222,7 +223,7 @@ public class TaskService {
 
 //                        python3 /home/ubuntu/test/Evaluate/main.py /home/ubuntu/test/test_data.csv 4 AEB_1-2
 //                        String command = "python3 " + pyPath + " " + runResultLinux + " " + task2.getSceneType();  // 默认使用场景名称找打分脚本
-                    String command = "python3 " + pyPath + "main.py " + runResultLinux + " " + task2.getSceneType() + " " + ruleName; // 指定打分脚本
+                    String scoreCommand = "python3 " + pyPath + "main.py " + runResultLinux + " " + task2.getSceneType() + " " + ruleName; // 指定打分脚本
                     String scoreResult;
                     try {
                         try {
@@ -232,12 +233,12 @@ public class TaskService {
                             throw new RuntimeException("------- /state 下载 minio 上的结果文件出错:" + e.getMessage());
                         }
                         try {
-                            log.info("TaskService--state 开始执行打分命令:" + command);
+                            log.info("TaskService--state 开始执行打分命令:" + scoreCommand);
 //                            scoreResult = SshUtil.execute(sessionScore, command);
-                            scoreResult = LinuxUtil.execute(command);
+                            scoreResult = SshUtil.execute(session, scoreCommand);
                             log.info("TaskService--state 打分结束,结果为:" + scoreResult);
                         } catch (IOException e) {
-                            throw new RuntimeException("------- /state 任务 " + task2Id + " 打分出错,命令为:" + command + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
+                            throw new RuntimeException("------- /state 任务 " + task2Id + " 打分出错,命令为:" + scoreCommand + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
                         }
                     } catch (Exception e) {
                         task2.setRunState(DictConstants.TASK_ABORTED);
@@ -335,7 +336,8 @@ public class TaskService {
         String post = HttpUtil.post(closeableHttpClient, requestConfig, evaluationLevelUri, headers, params);
         log.info("TaskService--state 访问仿真云平台评价等级接口:" + evaluationLevelUri + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + post);
         log.info("TaskService--state 项目 " + projectId + " 打分完成");
-
+        session.close();
+        client.stop();
 
     }