root 2 년 전
부모
커밋
b369770964

+ 36 - 34
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -20,14 +20,17 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -424,7 +427,7 @@ public class ProjectConsumer {
         int messageNumber = 0;
         ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1);   // 创建主题
         // 需要即时启动的任务(并行度的大小)
-        List<String> yamlListToRun = new ArrayList<>(realCurrentParallelism);
+        CopyOnWriteArrayList<String> yamlListToRun = new CopyOnWriteArrayList<>();
         for (String taskJsonPath : taskJsonList) {
             String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
             //TODO 设置任务重试次数为 0,方便任务进行最大3次的重试。
@@ -436,49 +439,48 @@ public class ProjectConsumer {
             stringRedisTemplate.opsForValue().set(taskMessageKey, taskJson);
 
             //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
-            kafkaTemplate.send(projectId, messageNumber % currentParallelism, "", taskJson)
-                    .addCallback(success -> {
-                        // 消息发送到的topic
-                        String topic = success.getRecordMetadata().topic();
-                        // 消息发送到的分区
-                        int partition = success.getRecordMetadata().partition();
-                        // 消息在分区内的offset
-                        long offset = success.getRecordMetadata().offset();
-                        log.info("------- ProjectConsumer 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:"
-                                + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
-                        //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
-                        // 选一个count 最少的node,如果 count 是 0 则直接启动。
-                        AtomicReference<String> currentNodeName = new AtomicReference<>("");
-                        AtomicInteger currentCount = new AtomicInteger(Integer.MAX_VALUE);
-                        nodeListToCount.forEach(nodeTO -> {
-                            int tempCount = nodeTO.getCount();
-                            String tempNodeName = nodeTO.getNodeName();
-                            if (tempCount < currentCount.get()) {
-                                currentCount.set(tempCount);
-                                currentNodeName.set(tempNodeName);
-                                nodeTO.setCount(tempCount + 1);
-                            }
-                        });
-                        String currentNodeNameValue = currentNodeName.get();
-                        int currentCountValue = currentCount.get();
-                        String tempYaml = projectManager.createTempYaml(projectId, algorithmDockerImage
-                                , currentNodeNameValue, partition, offset, currentCountValue);
-                        if (currentCountValue == 0) {
-                            yamlListToRun.add(tempYaml);
-                        }
-                    }, failure -> log.error("------- 发送消息失败:" + failure.getMessage()));
+            SendResult<String, String> stringStringSendResult = kafkaTemplate.send(projectId, messageNumber % currentParallelism, taskId, taskJson).get();
+            RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
+            String topic = recordMetadata.topic();  // 消息发送到的topic
+            int partition = recordMetadata.partition(); // 消息发送到的分区
+            long offset = recordMetadata.offset();  // 消息在分区内的offset
+            log.info("------- ProjectConsumer 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:"
+                    + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
+            //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
+            // 选一个count 最少的node,如果 count 是 0 则直接启动。
+            AtomicReference<String> currentNodeName = new AtomicReference<>("");
+            AtomicInteger currentCount = new AtomicInteger(Integer.MAX_VALUE);
+            nodeListToCount.forEach(nodeTO -> {
+                int tempCount = nodeTO.getCount();
+                String tempNodeName = nodeTO.getNodeName();
+                if (tempCount < currentCount.get()) {
+                    currentCount.set(tempCount);
+                    currentNodeName.set(tempNodeName);
+                    nodeTO.setCount(tempCount + 1);
+                }
+            });
+            String currentNodeNameValue = currentNodeName.get();
+            int currentCountValue = currentCount.get();
+            String tempYaml = projectManager.createTempYaml(projectId, algorithmDockerImage
+                    , currentNodeNameValue, partition, offset, currentCountValue);
+            if (currentCountValue == 0) {
+                yamlListToRun.add(tempYaml);
+            }
             messageNumber++;
         }
-        log.info("ProjectService--sendTaskMessage 共发送了 " + messageNumber + " 条消息!");
+        Thread.sleep(10);
+        log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息。");
+        log.info("项目 " + projectId + " 准备首先启动 " + yamlListToRun);
         for (String yaml : yamlListToRun) {
             projectUtil.createPod2(yaml);
         }
+        log.info("项目 " + projectId + " 已经启动 " + yamlListToRun);
     }
 
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")
     @SneakyThrows
     public void stopProject(ConsumerRecord<String, String> stopRecord) {
-        log.info("ProjectConsumer--stopManualProject 接收到的项目终止消息为:" + stopRecord);
+        log.info("接收到的项目终止消息为:" + stopRecord);
         //1 读取 kafka 的项目停止信息
         /*
             {

+ 0 - 13
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/to/TaskMessageTO.java

@@ -1,13 +0,0 @@
-package com.css.simulation.resource.scheduler.pojo.to;
-
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class TaskMessageTO {
-}

+ 2 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
@@ -211,6 +212,7 @@ public class ProjectUtil {
      * @param podYamlPath pod 文件内容
      */
     @SneakyThrows
+    @Async
     public void createPod2(String podYamlPath) {
         String nodeName = new File(podYamlPath).getName().split("#")[0];
         String podName = podYamlPath.split("#")[1].split("\\.")[0];