Ver código fonte

传 partition 和 offset

root 2 anos atrás
pai
commit
9fc05aca2e

+ 12 - 0
api-common/src/main/java/api/common/util/FileUtil.java

@@ -597,6 +597,18 @@ public class FileUtil {
         return result;
     }
 
+    public static void deleteFileBySubstring(String parentDirectoryPath,String substringOfFilename){
+        File[] files = new File(parentDirectoryPath).listFiles();
+        if(files == null || files.length == 0){
+            return;
+        }
+        for (File file : files) {
+            if (file.getName().contains(substringOfFilename)){
+                boolean delete = file.delete();
+            }
+        }
+    }
+
     /**
      * 获取所有文件列表。
      *

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -223,7 +223,7 @@ public class ProjectConsumer {
         // -------------------------------- 4 发送任务消息 --------------------------------
         projectService.sendTaskMessage(realCurrentParallelism, projectRunningKey, userId, projectId, projectType, nodeMap, algorithmDockerImage, videoTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
-        projectService.createPod(projectId, nodeMap, algorithmDockerImage);
+//        projectService.createPod(projectId, nodeMap, algorithmDockerImage);
 
     }
 

+ 7 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/ProjectManager.java

@@ -36,7 +36,8 @@ public class ProjectManager {
                                String algorithmDockerImage,
                                String nodeName,
                                int kafkaPartition,
-                               long kafkaOffset
+                               long kafkaOffset,
+                               int count
 
     ) {
         String podName = projectUtil.getRandomPodName(projectId);   // 生成 podName
@@ -47,8 +48,8 @@ public class ProjectManager {
         String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
         String replace3 = replace2.replace("algorithm-image", algorithmDockerImage);
         String replace4 = replace3.replace("kafkaTopic", projectId);     // 消息主题名称为 projectId
-        String replace5 = replace4.replace("kafkaPartition", kafkaPartition + "");     // 消息主题名称为 projectId
-        String replace6 = replace5.replace("kafkaOffset", kafkaOffset + "");     // 消息主题名称为 projectId
+        String replace5 = replace4.replace("kafka-partition", "\"" + kafkaPartition + "\"");     // 消息主题名称为 projectId
+        String replace6 = replace5.replace("kafka-offset", "\"" + kafkaOffset + "\"");     // 消息主题名称为 projectId
         String replace7 = replace6.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
         String replace8 = replace7.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
         String replace9 = replace8.replace("node-name", nodeName);     // 指定 pod 运行节点
@@ -56,5 +57,8 @@ public class ProjectManager {
         FileUtil.writeStringToLocalFile(replace9, podYamlDirectory + podYaml);
 //        log.info("ProjectService--createPod 在节点 " + nodeName + " 开始执行 pod。");
 //        projectUtil.createPod(nodeName, podName, tempPodString);
+        if (count == 0) {
+            projectUtil.createPod2(nodeName, podYamlDirectory + podYaml);
+        }
     }
 }

+ 2 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -227,7 +227,8 @@ public class TaskManager {
                 try {
                     String runState = taskOfLeaf.getRunState();
                     log.info("TaskManager--score 任务 " + task2Id + " 的运行状态为:" + runState);
-                    if (DictConstants.TASK_ANALYSIS.equals(runState)) {
+                    // 加上 running 时因为事务原因,初期没有考虑到
+                    if (DictConstants.TASK_ANALYSIS.equals(runState) || DictConstants.TASK_RUNNING.equals(runState)) {
                         taskMapper.updateSuccessStateWithStopTime(task2Id, DictConstants.TASK_ANALYSING, TimeUtil.getNowForMysql());
                         // 计算每个任务的得分
                         String result1OfMinio = taskOfLeaf.getRunResultFilePath() + "/Ego.csv";

+ 15 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/to/NodeTO.java

@@ -0,0 +1,15 @@
+package com.css.simulation.resource.scheduler.pojo.to;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class NodeTO {
+    private String nodeName;
+    private Integer count;
+}

+ 33 - 7
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -33,6 +33,8 @@ import java.io.InputStream;
 import java.security.InvalidKeyException;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 @Service
@@ -198,7 +200,7 @@ public class ProjectService {
                                 VehiclePO vehiclePO,
                                 List<CameraPO> cameraPOList,
                                 List<OgtPO> ogtPOList) {
-        Map<String, Integer> nodeMapToCount = projectUtil.getNodeMapToCount(nodeMap);
+        List<NodeTO> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
         final int[] messageNumber = CollectionUtil.createIntArray(0);
         ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, parallelism, (short) 1);   // 创建主题
         log.info("ProjectService--sendTaskMessage 项目 " + projectId + " 获得的包括的场景信息为:" + scenePOSet);
@@ -312,7 +314,6 @@ public class ProjectService {
                 //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
                 String finalTaskJson = taskJson;
                 stringRedisTemplate.opsForValue().set(taskMessagePrefix, finalTaskJson);
-                List<PartitionTO> partitionDTOList = new ArrayList<>();
                 kafkaTemplate.send(projectId, messageNumber[0] % parallelism, "", taskJson).addCallback(success -> {
                     // 消息发送到的topic
                     String topic = success.getRecordMetadata().topic();
@@ -322,11 +323,21 @@ public class ProjectService {
                     long offset = success.getRecordMetadata().offset();
                     log.info("------- ProjectConsumer 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset + " 消息体为:" + finalTaskJson);
                     //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
-                    // 选一个count最少的node
-                    String leastCountNodeName = projectUtil.getLeastCountNodeOfNodeMapToCount(nodeMapToCount);
-                    projectManager.createTempYaml(projectId, algorithmDockerImage, leastCountNodeName, partition, offset);
-
-
+                    // 选一个count 最少的node,如果 count 是 0 则直接启动。
+                    AtomicReference<String> currentNodeName = new AtomicReference<>("");
+                    AtomicInteger currentCount = new AtomicInteger(Integer.MAX_VALUE);
+                    nodeListToCount.forEach(nodeTO -> {
+                        int tempCount = nodeTO.getCount();
+                        String tempNodeName = nodeTO.getNodeName();
+                        if (tempCount < currentCount.get()) {
+                            currentCount.set(tempCount);
+                            currentNodeName.set(tempNodeName);
+                            nodeTO.setCount(tempCount + 1);
+                        }
+                    });
+                    String currentNodeNameValue = currentNodeName.get();
+                    int currentCountValue = currentCount.get();
+                    projectManager.createTempYaml(projectId, algorithmDockerImage, currentNodeNameValue, partition, offset, currentCountValue);
                 }, failure -> log.error("------- 发送消息失败:" + failure.getMessage()));
                 messageNumber[0] = messageNumber[0] + 1;
             });
@@ -489,6 +500,18 @@ public class ProjectService {
         });
     }
 
+    /**
+     * 运行 pod
+     *
+     * @param projectId            项目id
+     * @param nodeMap              并行度
+     * @param algorithmDockerImage 算法镜像
+     */
+    @SneakyThrows
+    public void createPod2(String projectId, Map<String, Integer> nodeMap, String algorithmDockerImage) {
+
+    }
+
 
     /**
      * @param projectId   手动项目 id 或自动项目子id
@@ -516,6 +539,9 @@ public class ProjectService {
         PrefixTO redisPrefix = projectUtil.getRedisPrefixByProjectIdAndProjectType(projectId, projectType);
         RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getProjectRunningKey());
         RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getProjectWaitingKey());
+
+        //4 删除所有临时文件 pod-yaml
+        FileUtil.deleteFileBySubstring(podYamlDirectory, projectId);
     }
 
 

+ 33 - 23
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -11,6 +11,7 @@ import com.css.simulation.resource.scheduler.mapper.UserMapper;
 import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.UserPO;
 import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
+import com.css.simulation.resource.scheduler.pojo.to.NodeTO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import io.kubernetes.client.openapi.ApiClient;
 import lombok.SneakyThrows;
@@ -22,8 +23,8 @@ import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.io.File;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -60,24 +61,23 @@ public class ProjectUtil {
     @Resource
     ApiClient apiClient;
 
-
-    public synchronized String getLeastCountNodeOfNodeMapToCount(Map<String, Integer> nodeMapToCount) {
-        AtomicReference<String> result = new AtomicReference<>();
-        result.set("");
-        int tempCount = 1;
-        nodeMapToCount.forEach((nodeName, count) -> {
-            if (count < tempCount) {
-                result.set(nodeName);
+    @SneakyThrows
+    public void deleteYamlByProjectId(String projectId){
+        List<String> absolutePathList = FileUtil.listAbsolutePath(podYamlDirectory);
+        for (String absolutePath : absolutePathList) {
+            if(absolutePath.contains(projectId)){
+                boolean delete = new File(absolutePath).delete();
             }
-        });
-        return result.get();
+        }
     }
 
-    public Map<String, Integer> getNodeMapToCount(Map<String, Integer> nodeMap) {
-        Map<String, Integer> result = new HashMap<>();
+
+
+    public List<NodeTO> getNodeListToCount(Map<String, Integer> nodeMap) {
+        List<NodeTO> result = new ArrayList<>();
         nodeMap.forEach((nodeName, parallelism) -> {
             for (int i = 0; i < parallelism; i++) {
-                result.put(nodeName, 0);
+                result.add(new NodeTO(nodeName, 0));
             }
         });
         return result;
@@ -165,19 +165,21 @@ public class ProjectUtil {
     @SneakyThrows
     public void createNextPod2(String projectId, String nodeName, String lastPodName) {
         deletePod(lastPodName);
-
         //1 删除上一个 pod 和 redis 键值对 和 旧的 yaml 文件
         deletePod(lastPodName);
-        List<String> list1 = FileUtil.listAbsolutePath(podYamlDirectory);
-        list1.forEach(absolutePath -> {
+        List<String> list = FileUtil.listAbsolutePath(podYamlDirectory);
+        Iterator<String> iterator1 = list.iterator();
+        while (iterator1.hasNext()){
+            String absolutePath = iterator1.next();
             if (absolutePath.contains(nodeName) && absolutePath.contains(lastPodName)) {
                 FileUtil.rm(absolutePath);
+                list.remove(absolutePath);
+                break;
             }
-        });
-        List<String> list2 = FileUtil.listAbsolutePath(podYamlDirectory);
-        for (String absolutePath : list2) {
-            if (absolutePath.contains(nodeName) && absolutePath.contains(lastPodName)) {
-                createPod2(absolutePath);
+        }
+        for (String absolutePath : list) {
+            if (absolutePath.contains(nodeName) && absolutePath.contains(projectId)) {
+                createPod2(projectId, absolutePath);
                 log.info("ProjectUtil--createNextPod 创建项目 " + projectId + " 的下一个 pod。");
                 return;
             }
@@ -206,14 +208,19 @@ public class ProjectUtil {
     }
 
     /**
+     * @param nodeName       节点名称
      * @param podYamlPath pod 文件内容
      */
     @SneakyThrows
-    public void createPod2(String podYamlPath) {
+    public void createPod2(String nodeName,  String podYamlPath) {
+        String podName = podYamlPath.split("#")[1].split("\\.")[0];
+        stringRedisTemplate.opsForValue().set("pod:" + podName + ":node", nodeName);    // 将 pod 运行在哪个 node 上记录到 redis
+        KubernetesUtil.createNs(apiClient, kubernetesConfiguration.getNamespace());
         KubernetesUtil.applyYaml(hostname, username, password, podYamlPath);
     }
 
 
+
     public String getProjectTypeByProjectId(String projectId) {
         String projectType = null;
         ProjectPO manualProjectPO = manualProjectMapper.selectById(projectId);
@@ -552,6 +559,9 @@ public class ProjectUtil {
     public void addOneParallelismToNode(String nodeName) {
         String key = "node:" + nodeName + ":parallelism";
         String parallelismString = stringRedisTemplate.opsForValue().get(key);
+        if(StringUtil.isEmpty(parallelismString)){
+            throw new RuntimeException("ProjectUtil--addOneParallelismToNode redisKey " + key + " 为空。");
+        }
         int parallelism = Integer.parseInt(parallelismString);
         stringRedisTemplate.opsForValue().set(key, (parallelism + 1) + "");
         log.info("ProjectUtil--addOneParallelismToNode 归还并行度从 " + parallelism + " 加一。");