فهرست منبع

kafka 根据并行度发送到指定分区

root 2 سال پیش
والد
کامیت
fbb8cbbcb9

+ 7 - 0
api-common/src/main/java/api/common/util/CollectionUtil.java

@@ -4,6 +4,13 @@ import java.util.*;
 
 public class CollectionUtil {
 
+    public static <E> E[] createArray(E[] elements) {
+        return elements;
+    }
+
+    public static int[] createIntArray(int... elements) {
+        return elements;
+    }
 
     public static <T> List<T> arrayToList(T[] array) {
         return Arrays.asList(array);

+ 5 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -210,18 +210,19 @@ public class ProjectConsumer {
             stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism - parallelismToUse) + "");
         });
         // 重新设置实际使用的并行度并保存到 redis
-        projectMessageDTO.setCurrentParallelism(nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum());
+        int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
+        projectMessageDTO.setCurrentParallelism(realCurrentParallelism);
         log.info("ProjectConsume--parseProject 项目 " + projectId + " 运行在:" + nodeMap);
         stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
-
-        Set<ScenePO> scenePOSet = new HashSet<>(scenePOList); // 如果不去重的话会出现多个场景重复关联多个指标
+        //去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
+        Set<ScenePO> scenePOSet = new HashSet<>(scenePOList);
         // -------------------------------- 2 查询模型 --------------------------------
         //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
         VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
         List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
         List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
         // -------------------------------- 3 发送任务消息 --------------------------------
-        projectService.sendTaskMessage(projectRunningKey, userId, projectId, projectType, videoTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
+        projectService.sendTaskMessage(realCurrentParallelism, projectRunningKey, userId, projectId, projectType, videoTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
         // -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
         String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------

+ 0 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -58,7 +58,6 @@ public class TaskManager {
     String evaluationLevelUri;
     @Value("${scheduler.minio-path.project-result}")
     String resultPathMinio;
-
     @Value("${spring.kafka.hostname}")
     String hostnameKafka;
     @Value("${spring.kafka.username}")

+ 6 - 9
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -165,6 +165,7 @@ public class ProjectService {
 
 
     /**
+     * @param parallelism          并行度
      * @param projectRunningPrefix
      * @param userId
      * @param projectId
@@ -175,9 +176,9 @@ public class ProjectService {
      * @param ogtPOList
      */
     @SneakyThrows
-    public void sendTaskMessage( String projectRunningPrefix, String userId, String projectId, String projectType, Long videoTime, Set<ScenePO> scenePOSet, VehiclePO vehiclePO, List<CameraPO> cameraPOList, List<OgtPO> ogtPOList) {
+    public void sendTaskMessage(int parallelism, String projectRunningPrefix, String userId, String projectId, String projectType, Long videoTime, Set<ScenePO> scenePOSet, VehiclePO vehiclePO, List<CameraPO> cameraPOList, List<OgtPO> ogtPOList) {
 
-        final int[] messageNumber = {0};
+        final int[] messageNumber = CollectionUtil.createIntArray(0);
         log.info("ProjectService--sendTaskMessage 项目 " + projectId + " 获得的包括的场景信息为:" + scenePOSet);
         for (ScenePO scenePO : scenePOSet) {
             String sceneId = scenePO.getId();
@@ -263,21 +264,17 @@ public class ProjectService {
                 } catch (JsonProcessingException e) {
                     e.printStackTrace();
                 }
-                //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
+                //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
                 String finalTaskJson = taskJson;
                 stringRedisTemplate.opsForValue().set(taskMessagePrefix, finalTaskJson);
-                kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
+                kafkaTemplate.send(projectId, messageNumber[0] % parallelism, "", taskJson).addCallback(success -> {
                     // 消息发送到的topic
                     String topic = success.getRecordMetadata().topic();
                     // 消息发送到的分区
                     int partition = success.getRecordMetadata().partition();
                     // 消息在分区内的offset
                     long offset = success.getRecordMetadata().offset();
-                    log.info("------- ProjectConsumer 发送消息成功:\n"
-                            + "主题 topic 为:" + topic + "\n"
-                            + "分区 partition 为:" + partition + "\n"
-                            + "偏移量为:" + offset + "\n"
-                            + "消息体为:" + finalTaskJson);
+                    log.info("------- ProjectConsumer 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset + " 消息体为:" + finalTaskJson);
                 }, failure -> log.error("------- 发送消息失败:" + failure.getMessage()));
                 messageNumber[0] = messageNumber[0] + 1;
             });

+ 2 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -214,7 +214,7 @@ public class ProjectUtil {
             String restParallelismString = stringRedisTemplate.opsForValue().get("node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
             // -------------------------------- Comment --------------------------------
             int restParallelism;
-            if (restParallelismString == null) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
+            if (restParallelismString == null || Integer.parseInt(restParallelismString) > maxParallelism) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
                 restParallelism = maxParallelism;
                 stringRedisTemplate.opsForValue().set("node:" + nodeName + ":parallelism", restParallelism + "");
             } else {
@@ -247,7 +247,7 @@ public class ProjectUtil {
             String restParallelismString = stringRedisTemplate.opsForValue().get("node:" + nodeName + ":parallelism");// 获取节点剩余并行度的 key
             // -------------------------------- Comment --------------------------------
             int restParallelism;
-            if (restParallelismString == null) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
+            if (restParallelismString == null || Integer.parseInt(restParallelismString) > maxParallelism) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大并行度的预设值
                 restParallelism = maxParallelism;
                 stringRedisTemplate.opsForValue().set("node:" + nodeName + ":parallelism", restParallelism + "");
             } else {