martin %!s(int64=2) %!d(string=hai) anos
pai
achega
7193213589

+ 28 - 32
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -12,10 +12,12 @@ import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.pojo.to.ScoreTO;
+import com.css.simulation.resource.scheduler.util.KubernetesUtil;
 import com.css.simulation.resource.scheduler.util.MinioUtil;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.kubernetes.client.openapi.ApiClient;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -76,20 +78,22 @@ public class TaskManager {
     RequestConfig requestConfig;
     @Autowired
     ProjectUtil projectUtil;
+    @Autowired
+    ApiClient apiClient;
 
     @SneakyThrows
     @Transactional
-    public boolean isProjectCompleted(PrefixTO redisPrefix, String projectId, String taskId, String state, String podName, ClientSession session) {
+    public boolean isProjectCompleted(PrefixTO redisPrefix, String projectId, String taskId, String state, String podName) {
         if ("Running".equals(state)) {
             // 将运行中的任务的 pod 名称放入 redis
             stringRedisTemplate.opsForValue().set(redisPrefix.getTaskPodKey(), podName);
             taskTick(taskId); // 刷新一下心跳
-            log.info("TaskManager--state 修改任务 " + taskId + "的状态为 Running,pod 名称为:" + podName);
+            log.info("TaskManager--state 修改任务 " + taskId + " 的状态为 " + state + ",pod 名称为:" + podName);
             taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
             return false;
         } else {
-            String podDeleteCommand = "kubectl delete pod " + podName;
-            log.info("TaskManager--state 修改任务 " + taskId + "的状态为:" + state + ",pod 名称为:" + podName + ",并执行删除 pod 命令:" + podDeleteCommand);
+//            String podDeleteCommand = "kubectl delete pod " + podName;
+            log.info("TaskManager--state 修改任务 " + taskId + "的状态为 " + state + ",pod 名称为:" + podName + ",并删除 pod。");
             if ("Aborted".equals(state)) {
                 if (retry(projectId, taskId, redisPrefix.getTaskRetryKey(), redisPrefix.getTaskMessageKey())) {
                     taskMapper.updateStateById(DictConstants.TASK_RUNNING, taskId);
@@ -131,7 +135,8 @@ public class TaskManager {
                 }
                 taskMapper.updateFailStateWithStopTime(taskId, state, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_4);
             }
-            SshUtil.execute(session, podDeleteCommand);
+//            SshUtil.execute(session, podDeleteCommand);
+            KubernetesUtil.deletePod(apiClient, "default", podName);
         }
         int taskNum = taskMapper.selectTaskNumByProjectId(projectId);
         int endTaskNum = taskMapper.selectEndTaskNumByProjectId(projectId);    // 查询已结束的任务 'Aborted', 'PendingAnalysis', 'Terminated'
@@ -143,35 +148,26 @@ public class TaskManager {
     }
 
     public boolean retry(String projectId, String taskId, String taskRetryKey, String taskMessageKey) {
-        log.info("TaskService--retry 重试操作收到的参数为:projectId=" + projectId + ",taskId=" + taskId);
-        //1 首先查看任务是否重试过 3 次
-        String retryString = stringRedisTemplate.opsForValue().get(taskRetryKey);
-        int retry = Integer.parseInt(Objects.requireNonNull(retryString));
-        //2 如果重试次数没有超过 3 次,则重试
-        if (retry > 3) {
+        try {
+            log.info("TaskManager--retry 重试操作收到的参数为:projectId=" + projectId + ",taskId=" + taskId);
+            //1 首先查看任务是否重试过 3 次
+            String retryString = stringRedisTemplate.opsForValue().get(taskRetryKey);
+            int retry = Integer.parseInt(Objects.requireNonNull(retryString));
+            //2 如果重试次数没有超过 3 次,则重试
+            if (retry > 3) {
+                return false;
+            }
+            String taskJson = stringRedisTemplate.opsForValue().get(taskMessageKey);
+            retry++;
+            log.info("TaskService--retry 重试项目 " + projectId + " 的任务 " + taskId + ",重试次数为:" + retry + ",重新发送的消息为:" + taskJson);
+            stringRedisTemplate.opsForValue().set(taskRetryKey, retry + "");
+            kafkaTemplate.send(projectId, taskJson);
+            return true;
+        } catch (Exception e) {
+            log.error("TaskManager--retry 重试操作报错:", e);
             return false;
         }
-        String taskJson = stringRedisTemplate.opsForValue().get(taskMessageKey);
-        retry++;
-        log.info("TaskService--retry 重试项目 " + projectId + " 的任务 " + taskId + ",重试次数为:" + retry + ",重新发送的消息为:" + taskJson);
-        stringRedisTemplate.opsForValue().set(taskRetryKey, retry + "");
-        kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
-            // 消息发送到的topic
-            assert success != null;
-            String topic = success.getRecordMetadata().topic();
-            // 消息发送到的分区
-            int partition = success.getRecordMetadata().partition();
-            // 消息在分区内的offset
-            long offset = success.getRecordMetadata().offset();
-            log.info("------- ProjectConsumer 发送消息成功:\n"
-                    + "主题 topic 为:" + topic + "\n"
-                    + "分区 partition 为:" + partition + "\n"
-                    + "偏移量为:" + offset + "\n"
-                    + "消息体为:" + taskJson);
-        }, failure -> {
-            log.error("------- 发送消息失败:" + failure.getMessage());
-        });
-        return true;
+
     }
 
     public void prepareScore(String projectRunningKey) {

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -65,7 +65,7 @@ public class TaskService {
         SshClient sshClient = SshUtil.getClient();
         ClientSession clientSession = SshUtil.getSession(sshClient, hostname, username, password);
         //1 判断项目是否已完成
-        boolean projectCompleted = taskManager.isProjectCompleted(redisPrefix, projectId, taskId, state, podName, clientSession);
+        boolean projectCompleted = taskManager.isProjectCompleted(redisPrefix, projectId, taskId, state, podName);
         if (!projectCompleted) {
             clientSession.close();
             sshClient.stop();

+ 16 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/KubernetesUtil.java

@@ -9,6 +9,7 @@ import io.kubernetes.client.openapi.models.V1JobList;
 import io.kubernetes.client.openapi.models.V1Namespace;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1PodList;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
 
@@ -17,9 +18,9 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class KubernetesUtil {
 
-
     public static void applyYaml(String hostname, String username, String password, String jobTemplateYamlPathTarget) throws IOException {
         SshClient client = SshUtil.getClient();
         ClientSession session = SshUtil.getSession(client, hostname, username, password);
@@ -101,6 +102,20 @@ public class KubernetesUtil {
     }
 
 
+    /**
+     * 删除 pod
+     *
+     * @param apiClient     api 客户端
+     * @param namespaceName namespace 名称
+     * @param podName       pod 名称
+     * @throws ApiException 异常
+     */
+    public static void deletePod(ApiClient apiClient, String namespaceName, String podName) throws ApiException {
+        log.info("KubernetesUtil--deletePod 删除 " + namespaceName + ":" + podName);
+        CoreV1Api coreV1Api = new CoreV1Api(apiClient);
+        coreV1Api.deleteNamespacedPod(podName, namespaceName, null, null, null, null, null, null);
+    }
+
     public static List<String> getJob(ApiClient apiClient, String namespaceName) throws ApiException {
         BatchV1Api batchV1Api = new BatchV1Api(apiClient);
         V1JobList v1JobList;

+ 8 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -8,6 +8,7 @@ import com.css.simulation.resource.scheduler.pojo.po.UserPO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
@@ -25,6 +26,8 @@ public class ProjectUtil {
     UserMapper userMapper;
     @Autowired
     ClusterMapper clusterMapper;
+    @Autowired
+    KafkaTemplate<String, String> kafkaTemplate;
 
 
     public PrefixTO getRedisPrefixByUserIdAndProjectIdAndTaksId(String userId, String projectId, String taskId) {
@@ -58,7 +61,7 @@ public class ProjectUtil {
         return PrefixTO.builder()
                 .clusterPrefix(clusterPrefix)
                 .clusterRunningPrefix(clusterRunningPrefix)
-                .clusterWaitingPrefix(clusterPrefix)
+                .projectRunningKey(projectRunningKey)
                 .taskTickKey(taskTickKey)
                 .taskPodKey(taskPodKey)
                 .taskRetryKey(taskRetryKey)
@@ -131,5 +134,9 @@ public class ProjectUtil {
         return clusterRunningKeySet.stream().filter(key -> StringUtil.countSubString(key, ":") == 3).collect(Collectors.toList());
     }
 
+    public void sendMessage(String topic, String message) {
+        kafkaTemplate.send(topic, message);
+    }
+
 
 }