root пре 2 година
родитељ
комит
a350758dea

+ 49 - 0
api-common/src/main/java/api/common/util/FileUtil.java

@@ -259,6 +259,19 @@ public class FileUtil {
         return Objects.requireNonNull(directory.listFiles()).length;
     }
 
+    /**
+     * 计算目录包含的指定类型的文件数量
+     *
+     * @param directory 目录的 File 对象
+     * @param type      文件类型,不带点,例如 txt
+     * @return 文件数量
+     */
+    @SneakyThrows
+    public static long countByType(String directory, String type) {
+        String[] list = new File(directory).list();
+        return Arrays.stream(list).filter(absolutePath -> absolutePath.endsWith(type)).count();
+    }
+
     /**
      * 计算目录包含的文件数量
      *
@@ -688,6 +701,42 @@ public class FileUtil {
         return result;
     }
 
+    public static String getFilenameWithoutSuffix(String absolutePath) {
+        String[] split0 = absolutePath.split("/");
+        String[] split1 = split0[split0.length - 1].split("\\.");
+        return split1[0];
+    }
+
+    /**
+     * 获取所有同一类型的文件列表。
+     *
+     * @param rootPath 文件 File 路径。
+     * @param type   文件类型不带点,如 txt。
+     */
+    @SneakyThrows
+    public static List<String> listAbsolutePathByType(String rootPath, String type) {
+        List<String> result = new LinkedList<>();
+        File rootFile = new File(rootPath);
+        //1 判断文件是否存在
+        if (!rootFile.exists()) {
+            throw new RuntimeException("FileUtil.listAbsolutePathByType() 文件 " + rootPath + " 不存在!");
+        }
+        //2 判断是否是文件
+        if (rootFile.isFile() && rootFile.getName().endsWith(type)) {
+            return new ArrayList<>(Collections.singletonList(rootPath));
+        }
+        //3 判断是否是目录
+        if (rootFile.isDirectory()) {
+            File[] children = rootFile.listFiles();
+            if (children != null) {
+                for (File child : children) {
+                    result.addAll(listAbsolutePathByType(child.getAbsolutePath(), type));
+                }
+            }
+        }
+        return result;
+    }
+
     /**
      * 获取所有同一类型的文件列表。
      *

+ 73 - 31
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -4,10 +4,12 @@ package com.css.simulation.resource.scheduler.consumer;
 import api.common.pojo.constants.DictConstants;
 import api.common.pojo.dto.ProjectMessageDTO;
 import api.common.util.*;
+import com.css.simulation.resource.scheduler.manager.ProjectManager;
 import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.pojo.to.*;
 import com.css.simulation.resource.scheduler.service.ProjectService;
+import com.css.simulation.resource.scheduler.util.ApacheKafkaUtil;
 import com.css.simulation.resource.scheduler.util.MinioUtil;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -15,14 +17,18 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 @Component
 @Slf4j
@@ -69,6 +75,12 @@ public class ProjectConsumer {
     IndexMapper indexMapper;
     @Resource
     TaskMapper taskMapper;
+    @Resource
+    ProjectManager projectManager;
+    @Resource
+    KafkaTemplate<String, String> kafkaTemplate;
+    @Resource(name = "myKafkaAdmin")
+    Admin kafkaAdminClient;
 
 
     /**
@@ -95,7 +107,7 @@ public class ProjectConsumer {
         } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
             userId = autoSubProjectMapper.selectCreateUserById(projectId);
         }
-        String projectPath = linuxTempPath + projectId + "/";
+        String projectPath = linuxTempPath + "project/" + projectId + "/";
         FileUtil.mkdir(projectPath);
         // -------------------------------- 1 查询场景 --------------------------------
         //根据场景测试包 packageId,拿到场景集合(不包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
@@ -154,7 +166,7 @@ public class ProjectConsumer {
                 String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + xoscSuffix;
                 MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
                 MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
-                log.info("ProjectService--sendTaskMessage 已经将 xodr 上传到 minio 的结果文件目录:" + xoscPathOfMinio);
+                log.info("ProjectService--sendTaskMessage 已经将 xosc 上传到 minio 的结果文件目录:" + xoscPathOfMinio);
 
                 String scenarioOdr = scenePO.getScenarioOdr();
                 String[] splitXodr = scenarioOdr.split("/");
@@ -226,6 +238,9 @@ public class ProjectConsumer {
             }
         }
 
+        //* -------------------------------- 4 开始排队 --------------------------------
+        cacheProject(projectRecord);
+
     }
 
 
@@ -266,7 +281,7 @@ public class ProjectConsumer {
         if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
             log.info("ProjectConsumer--cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
             PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
-            run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
+            run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey(), userId);
             return;
         } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
             clusterPO = clusterMapper.selectByUserId(userId);
@@ -293,7 +308,7 @@ public class ProjectConsumer {
         int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
         // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
         if (currentParallelismSum + parallelism <= simulationLicenseNumber) {
-            run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
+            run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey(), userId);
         } else {
             wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
         }
@@ -305,7 +320,7 @@ public class ProjectConsumer {
      * @param projectRunningKey projectRunningKey
      * @param projectWaitingKey projectWaitingKey
      */
-    public void run(ProjectMessageDTO projectMessageDTO, String clusterId, String projectRunningKey, String projectWaitingKey) {
+    public void run(ProjectMessageDTO projectMessageDTO, String clusterId, String projectRunningKey, String projectWaitingKey, String userId) {
 
         String projectId = projectMessageDTO.getProjectId();
         int parallelism = projectMessageDTO.getParallelism();  // 期望并行度
@@ -316,7 +331,7 @@ public class ProjectConsumer {
             log.info("ProjectConsumer--run 集群 " + clusterId + " 执行项目 " + projectId);
             // 设置实际的并行度
             projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism));   // 设置实际的并行度
-            parseProject(projectMessageDTO, projectWaitingKey, projectRunningKey);
+            parseProject(projectMessageDTO, projectWaitingKey, projectRunningKey, userId);
         } else {
             log.info("ProjectConsumer--cacheManualProject 服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
             wait(projectWaitingKey, projectMessageDTO);
@@ -339,7 +354,7 @@ public class ProjectConsumer {
      * @param projectRunningKey projectRunningKey
      */
     @SneakyThrows
-    public void parseProject(ProjectMessageDTO projectMessageDTO, String projectWaitingKey, String projectRunningKey) {
+    public void parseProject(ProjectMessageDTO projectMessageDTO, String projectWaitingKey, String projectRunningKey, String userId) {
         String projectId = projectMessageDTO.getProjectId();    // 项目 id
         String projectType = projectMessageDTO.getType();   // 项目类型
         int currentParallelism = projectMessageDTO.getCurrentParallelism();   // 当前并行度
@@ -347,20 +362,13 @@ public class ProjectConsumer {
         long videoTime = projectMessageDTO.getMaxSimulationTime(); // 结果视频的时长
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
+        String projectPath = linuxTempPath + "project/" + projectId + "/";
         // -------------------------------- 0 准备 --------------------------------
         projectService.prepare(projectMessageDTO, projectWaitingKey, projectRunningKey);
-        String userId = null;
-        if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-            userId = manualProjectMapper.selectCreateUserById(projectId);
-        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-            userId = autoSubProjectMapper.selectCreateUserById(projectId);
-        }
-        // -------------------------------- 1 查询场景 --------------------------------
-        //1-1 根据场景测试包 packageId,拿到场景集合(不包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
-        List<ScenePO> scenePOList = projectService.handlePackage(projectRunningKey, projectId, packageId);
-        int taskTotal = scenePOList.size();
-        projectMessageDTO.setTaskTotal(taskTotal);
-        projectMessageDTO.setTaskCompleted(0);
+        // -------------------------------- 1 获取任务 json 列表 --------------------------------
+        List<String> taskJsonList = FileUtil.listAbsolutePathByType(projectPath, "json");
+        int taskTotal = taskJsonList.size();
+
         // 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
         Map<String, Integer> nodeMap;
         if (currentParallelism < taskTotal) {
@@ -372,7 +380,8 @@ public class ProjectConsumer {
         nodeMap.keySet().forEach(nodeName -> {
             int parallelismToUse = nodeMap.get(nodeName);
             String restParallelismKey = "node:" + nodeName + ":parallelism";
-            int restParallelism = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(restParallelismKey)));// 剩余可用并行度
+            int restParallelism = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue()
+                    .get(restParallelismKey))); // 剩余可用并行度
             stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism - parallelismToUse) + "");
         });
         // 重新设置实际使用的并行度并保存到 redis
@@ -380,23 +389,56 @@ public class ProjectConsumer {
         projectMessageDTO.setCurrentParallelism(realCurrentParallelism);
         log.info("ProjectConsume--parseProject 项目 " + projectId + " 运行在:" + nodeMap);
         stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
-        //去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
-        Set<ScenePO> scenePOSet = new HashSet<>(scenePOList);
-        // -------------------------------- 2 查询模型 --------------------------------
-        //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
-        VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId);   // 车辆
-        List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
-        List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
         // -------------------------------- 3 算法导入 --------------------------------
         String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
         // -------------------------------- 4 发送任务消息 --------------------------------
-        projectService.sendTaskMessage(realCurrentParallelism, projectRunningKey, userId, projectId, projectType,
-                nodeMap, algorithmDockerImage, videoTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
+        List<NodeTO> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
+        final int[] messageNumber = CollectionUtil.createIntArray(0);
+        ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1);   // 创建主题
+        for (String taskJsonPath : taskJsonList) {
+            String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
+            //TODO 设置任务重试次数为 0,方便任务进行最大3次的重试。
+            String taskRetryKey = projectRunningKey + ":task:" + taskId + ":retry";
+            stringRedisTemplate.opsForValue().set(taskRetryKey, "0");
+            // 保存运行中的任务信息
+            String taskMessageKey = projectRunningKey + ":task:" + taskId + ":message";
+            String taskJson = FileUtil.read(taskJsonPath);
+            stringRedisTemplate.opsForValue().set(taskMessageKey, taskJson);
 
+            //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
 
-        // -------------------------------- 5 创建 pod 开始执行 --------------------------------
-//        projectService.createPod(projectId, nodeMap, algorithmDockerImage);
+            kafkaTemplate.send(projectId, messageNumber[0] % currentParallelism, "", taskJson)
+                    .addCallback(success -> {
+                        // 消息发送到的topic
+                        String topic = success.getRecordMetadata().topic();
+                        // 消息发送到的分区
+                        int partition = success.getRecordMetadata().partition();
+                        // 消息在分区内的offset
+                        long offset = success.getRecordMetadata().offset();
+                        log.info("------- ProjectConsumer 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:"
+                                + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
+                        //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
+                        // 选一个count 最少的node,如果 count 是 0 则直接启动。
+                        AtomicReference<String> currentNodeName = new AtomicReference<>("");
+                        AtomicInteger currentCount = new AtomicInteger(Integer.MAX_VALUE);
+                        nodeListToCount.forEach(nodeTO -> {
+                            int tempCount = nodeTO.getCount();
+                            String tempNodeName = nodeTO.getNodeName();
+                            if (tempCount < currentCount.get()) {
+                                currentCount.set(tempCount);
+                                currentNodeName.set(tempNodeName);
+                                nodeTO.setCount(tempCount + 1);
+                            }
+                        });
+                        String currentNodeNameValue = currentNodeName.get();
+                        int currentCountValue = currentCount.get();
+                        projectManager.createTempYaml(projectId, algorithmDockerImage, currentNodeNameValue, partition
+                                , offset, currentCountValue);
+                    }, failure -> log.error("------- 发送消息失败:" + failure.getMessage()));
+            messageNumber[0] = messageNumber[0] + 1;
 
+        }
+        log.info("ProjectService--sendTaskMessage 共发送了 " + messageNumber[0] + " 条消息!");
     }
 
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")

+ 8 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -197,10 +197,14 @@ public class TaskManager {
         indexMapper.deleteFirstByProjectId(projectId);
         indexMapper.deleteLastByProjectId(projectId);
         //1 查询场景包对应指标
-        String allIndexTemplateListJson = stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey() + ":package:" + packageId + ":all");
-        List<IndexTemplatePO> allIndexTemplateList = JsonUtil.jsonToList(allIndexTemplateListJson, IndexTemplatePO.class);
-        String leafIndexTemplateListJson = stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey() + ":package:" + packageId + ":leaf");
-        List<IndexTemplatePO> leafIndexTemplateList = JsonUtil.jsonToList(leafIndexTemplateListJson, IndexTemplatePO.class);
+//        String allIndexTemplateListJson = stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey() + ":package:" + packageId + ":all");
+        List<IndexTemplatePO> allIndexTemplateList = JsonUtil.jsonToList(
+                FileUtil.read(linuxTempPath + "project/all-index-list.json"), IndexTemplatePO.class);
+        String leafIndexTemplateListJson = stringRedisTemplate.opsForValue()
+                .get(redisPrefix.getProjectRunningKey() + ":package:" + packageId + ":leaf");
+//        List<IndexTemplatePO> leafIndexTemplateList = JsonUtil.jsonToList(leafIndexTemplateListJson, IndexTemplatePO.class);
+        List<IndexTemplatePO> leafIndexTemplateList = JsonUtil.jsonToList(
+                FileUtil.read(linuxTempPath + "project/leaf-index-list.json"), IndexTemplatePO.class);
         log.info("TaskManager--score 共有 " + leafIndexTemplateList.size() + "个叶子节点:" + leafIndexTemplateListJson);
         int maxLevel = 1; // 用于计算指标得分
         List<LeafIndexPO> leafIndexList = new ArrayList<>();

+ 5 - 5
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -123,7 +123,7 @@ public class ProjectScheduler {
             }
             // -------------------------------- 项目没有执行说明等待中 --------------------------------
             if (isSystem) { // 系统管理员直接执行
-                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
+                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), userId);
                 return;
             }
             int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
@@ -141,21 +141,21 @@ public class ProjectScheduler {
                     }
                     if (parallelismSum < simulationLicenseNumber) { // 已经运行的项目小于集群允许运行的项目则,运行新的项目
                         if (parallelismSum + parallelism < simulationLicenseNumber) {
-                            run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
+                            run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), userId);
                             return;
                         }
                     }
                 }
             }
             if ((CollectionUtil.isEmpty(keySetOfAllProjectOfCluster) || CollectionUtil.isEmpty(keyListOfRunningProject)) && parallelism < simulationLicenseNumber) {
-                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), parallelism);
+                run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey(), userId);
             }
         }
     }
 
 
     @SneakyThrows
-    public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey, int parallelism) {
+    public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey, String userId) {
         ProjectMessageDTO projectMessageDTO;
         try {
             projectMessageDTO = JsonUtil.jsonToBean(RedisUtil.getStringByKey(stringRedisTemplate, projectWaitingKey), ProjectMessageDTO.class);
@@ -173,7 +173,7 @@ public class ProjectScheduler {
         if (restParallelism > 0L) {
             log.info("ProjectScheduler--run 集群 " + clusterId + " 执行项目项目 " + projectId);
             projectMessageDTO.setCurrentParallelism(restParallelism);    // 设置实际的并行度
-            projectConsumer.parseProject(projectMessageDTO, projectWaitingKey, projectRunningKey);
+            projectConsumer.parseProject(projectMessageDTO, projectWaitingKey, projectRunningKey, userId);
         }
     }
 

+ 2 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -136,12 +136,12 @@ public class ProjectService {
     public List<ScenePO> getSceneList(String projectId, String packageId, String projectPath) {
         //1 查询该场景包的所有指标列表,包删了无所谓,但要过滤删掉的指标。并保存成 json 文件。
         List<IndexTemplatePO> allIndexList = indexTemplateMapper.selectByPackageIdIncludeDeleted(packageId);
-        FileUtil.writeStringToLocalFile(JsonUtil.listToJson(allIndexList), projectPath + "allIndexList.json");
+        FileUtil.writeStringToLocalFile(JsonUtil.listToJson(allIndexList),projectPath+"all-index-list.json");
 
         //2 查询场景包叶子指标
         List<IndexTemplatePO> leafIndexList = allIndexList.stream().filter(index -> StringUtil.isNotEmpty(index.getRuleId())).collect(Collectors.toList());
         log.info("ProjectService--handlePackage 项目 " + projectId + " 的叶子指标为:" + leafIndexList);
-        FileUtil.writeStringToLocalFile(JsonUtil.listToJson(leafIndexList), projectPath + "allIndexList.json");
+        FileUtil.writeStringToLocalFile(JsonUtil.listToJson(allIndexList),projectPath+"leaf-index-list.json");
         List<ScenePO> sceneList = new ArrayList<>();
         leafIndexList.forEach(leafIndex -> {
             String naturalIds = leafIndex.getSceneNaturalIds();