martin 3 жил өмнө
parent
commit
68a4d46f75

+ 7 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -188,7 +188,7 @@ public class ManualProjectConsumer {
                     taskPO.setIsDeleted("0");
                     taskMapper.insert(taskPO);
                     // 心跳信息存在緩存中
-                    redisTemplate.opsForValue().set(manualProjectTopic + ":tick:" + projectId + ":" + taskId, TimeUtil.getNowString());
+                    redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick", TimeUtil.getNowString());
                     // 组装 task 消息
                     TaskTO taskTO = TaskTO.builder()
                             .info(InfoTO.builder()
@@ -233,7 +233,7 @@ public class ManualProjectConsumer {
                             .build();
 
                     //4-4 将对象转成 json
-                    String taskJson = null;
+                    String taskJson = "";
                     try {
                         taskJson = JsonUtil.beanToJson(taskTO);
                     } catch (JsonProcessingException e) {
@@ -258,6 +258,8 @@ public class ManualProjectConsumer {
                     }, failure -> {
                         log.error("------- 发送消息失败:" + failure.getMessage());
                     });
+
+                    redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":message", taskJson);
                 });
 
             } else {// 重新执行任务
@@ -266,7 +268,7 @@ public class ManualProjectConsumer {
                     String lastTargetId = taskMapper.selectLastTargetIdById(taskId);
                     taskMapper.updateStateById(DictConstants.TASK_PENDING, taskId);
                     // 心跳信息存在緩存中
-                    redisTemplate.opsForValue().set(manualProjectTopic + ":tick:" + projectId + ":" + taskId, TimeUtil.getNowString());
+                    redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick", TimeUtil.getNowString());
                     // 组装 task 消息
                     TaskTO taskTO = TaskTO.builder()
                             .info(InfoTO.builder()
@@ -311,7 +313,7 @@ public class ManualProjectConsumer {
                             .build();
 
                     //4-4 将对象转成 json
-                    String taskJson = null;
+                    String taskJson = "";
                     try {
                         taskJson = JsonUtil.beanToJson(taskTO);
                     } catch (JsonProcessingException e) {
@@ -336,6 +338,7 @@ public class ManualProjectConsumer {
                     }, failure -> {
                         log.error("------- ManualProjectConsumer 发送消息失败:" + failure.getMessage());
                     });
+                    redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":message", taskJson);
                 });
             }
         }

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -58,7 +58,7 @@ public class TickScheduler {
             for (TaskPO task : executingTaskList) {
                 String taskId = task.getId();
                 String projectId = task.getPId();
-                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":tick:" + projectId + ":" + taskId);
+                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":tick");
 //                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
                 assert s != null;
                 long tickTime = Long.parseLong(s);

+ 5 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/AlgorithmService.java

@@ -40,7 +40,11 @@ public class AlgorithmService {
         //3 获取文件列表中是否又 docker-entrypoint.sh
         boolean result = false;
         for (String path : pathList) {
-            if (path.contains("docker-entrypoint.sh")) {
+            if (
+                    path.contains("docker-entrypoint.sh")
+                            || path.contains("start_docker.sh")
+                            || path.contains("start_local.sh")
+            ) {
                 result = true;
                 break;
             }

+ 11 - 8
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -188,11 +188,14 @@ public class TaskService {
         projectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());   // 修改该 project 的状态为已完成
         log.info("结束项目的 job");
         SshUtil.execute(session, "kubectl delete job project-" + projectId);
-        SshClient clientKafka = SshUtil.getClient();
-        ClientSession sessionKafka = SshUtil.getSession(clientKafka, hostnameKafka, usernameKafka, passwordKafka);
-        String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand, "topicName", projectId);
-        SshUtil.execute(sessionKafka, topicDeleteCommand);
-        SshUtil.stop(clientKafka, sessionKafka);
+
+        // 删除 kafka topic
+//        SshClient clientKafka = SshUtil.getClient();
+//        ClientSession sessionKafka = SshUtil.getSession(clientKafka, hostnameKafka, usernameKafka, passwordKafka);
+//        String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand, "topicName", projectId);
+//        SshUtil.execute(sessionKafka, topicDeleteCommand);
+//        SshUtil.stop(clientKafka, sessionKafka);
+
         List<TaskPO> taskList = taskMapper.selectTaskListByProjectId(projectId);  // 所有任务信息
         int taskNumber = taskList.size();
         log.info("TaskService--state 共有 " + taskNumber + "个任务!");
@@ -244,15 +247,15 @@ public class TaskService {
                             log.info("TaskService--state 下载 minio 上的结果文件 " + runResultMinio + " 到本地:" + runResultLinux);
                             MinioUtil.downloadToFile(minioClient, bucketName, runResultMinio, runResultLinux);  // 也可改成下载到指定ip的服务器上,需要保证和打分脚本在一台机器上。
                         } catch (Exception e) {
-                            throw new RuntimeException("------- /state 下载 minio 上的结果文件出错:" + e.getMessage());
+                            throw new RuntimeException("------- TaskService--state 下载 minio 上的结果文件出错:" + e.getMessage());
                         }
                         try {
                             log.info("TaskService--state 开始执行打分命令:" + scoreCommand);
 //                            scoreResult = SshUtil.execute(sessionScore, command);
                             scoreResult = SshUtil.execute(session, scoreCommand);
-                            log.info("TaskService--state 打分结束,结果为:" + scoreResult);
+                            log.info("TaskService--state 项目" + projectId + "的任务" + task2Id + "打分结束,结果为:" + scoreResult);
                         } catch (IOException e) {
-                            throw new RuntimeException("------- /state 任务 " + task2Id + " 打分出错,命令为:" + scoreCommand + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
+                            throw new RuntimeException("------- TaskService--state 项目" + projectId + "的任务" + task2Id + " 打分出错,命令为:" + scoreCommand + " 修改状态为:" + DictConstants.TASK_ABORTED + "\n" + e.getMessage());
                         }
                     } catch (Exception e) {
                         task2.setRunState(DictConstants.TASK_ABORTED);