root há 2 anos atrás
pai
commit
94160e31cd

+ 19 - 10
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/ProjectManager.java

@@ -3,6 +3,7 @@ package com.css.simulation.resource.scheduler.manager;
 import api.common.pojo.constants.DictConstants;
 import api.common.util.FileUtil;
 import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
+import com.css.simulation.resource.scheduler.configuration.minio.MinioConfiguration;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -20,9 +21,13 @@ public class ProjectManager {
     String podTemplateYaml;
     @Value("${scheduler.linux-path.pod-yaml-directory}")
     String podYamlDirectory;
+    @Value("${spring.kafka.bootstrap-servers}")
+    String kafkaIp;
     @Resource
     KubernetesConfiguration kubernetesConfiguration;
     @Resource
+    MinioConfiguration minioConfiguration;
+    @Resource
     ProjectUtil projectUtil;
 
 
@@ -47,23 +52,27 @@ public class ProjectManager {
         String replace0 = podString.replace("vtd-container", "vtd-" + projectId);
         String replace1 = replace0.replace("algorithm-container", "algorithm-" + projectId);
         String replace2 = replace1.replace("algorithm-image", algorithmDockerImage);
-        String replace3 = replace2.replace("kafkaTopic", projectId);     // 消息主题名称为 projectId
-        String replace4 = replace3.replace("kafka-partition", "\"" + kafkaPartition + "\"");     // 消息主题名称为 projectId
-        String replace5 = replace4.replace("kafka-offset", "\"" + kafkaOffset + "\"");     // 消息主题名称为 projectId
-        String replace6 = replace5.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
-        String replace7 = replace6.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
-        String replace8 = replace7.replace("node-name", nodeName);     // 指定 pod 运行节点
+        String replace3 = replace2.replace("kafka-topic", projectId);     // 消息主题名称为 projectId
+        String replace4 = replace3.replace("kafka-ip", kafkaIp);
+        String replace5 = replace4.replace("kafka-partition", "\"" + kafkaPartition + "\"");
+        String replace6 = replace5.replace("kafka-offset", "\"" + kafkaOffset + "\"");
+        String replace7 = replace6.replace("minio-ip", minioConfiguration.getEndpoint());
+        String replace8 = replace7.replace("minio-access-key", minioConfiguration.getAccessKey());
+        String replace9 = replace8.replace("minio-secret-key", minioConfiguration.getSecretKey());
+        String replace10 = replace9.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
+        String replace11 = replace10.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
+        String replace12 = replace11.replace("node-name", nodeName);     // 指定 pod 运行节点
 
         String finalYaml = null;
         if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
             log.info("项目 " + projectId + " 使用 gpu 生成视频");
-            String replace9 = replace8.replace("vtd-image", kubernetesConfiguration.getVtdImageUseGpu());
-            finalYaml = replace9.replace("vtd-command", kubernetesConfiguration.getVtdCommandUseGpu());
+            String replace13 = replace12.replace("vtd-image", kubernetesConfiguration.getVtdImageUseGpu());
+            finalYaml = replace13.replace("vtd-command", kubernetesConfiguration.getVtdCommandUseGpu());
         }
         if (DictConstants.NOT_USE_GPU.equals(isChoiceGpu)) {
             log.info("项目 " + projectId + " 不使用 gpu 生成视频");
-            String replace9 = replace8.replace("vtd-image", kubernetesConfiguration.getVtdImageNotUseGpu());
-            finalYaml = replace9.replace("vtd-command", kubernetesConfiguration.getVtdCommandNotUseGpu());
+            String replace13 = replace12.replace("vtd-image", kubernetesConfiguration.getVtdImageNotUseGpu());
+            finalYaml = replace13.replace("vtd-command", kubernetesConfiguration.getVtdCommandNotUseGpu());
         }
         log.info("保存项目 " + projectId + " 的 yaml 文件:" + finalYaml);
         FileUtil.writeStringToLocalFile(finalYaml, podYamlDirectory + podYaml);

+ 4 - 218
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -162,220 +162,6 @@ public class ProjectService {
         return sceneList;
     }
 
-    @SneakyThrows
-    @Transactional
-    public List<ScenePO> handlePackage(String projectRunningKey, String projectId, String packageId) {
-        String allIndexPrefix = projectRunningKey + ":package:" + packageId + ":all";
-        String leafIndexPrefix = projectRunningKey + ":package:" + packageId + ":leaf";
-
-        //1 查询该场景包的所有指标列表存入 redis,包删了无所谓,但要过滤删掉的指标
-        List<IndexTemplatePO> allIndexList = indexTemplateMapper.selectByPackageIdIncludeDeleted(packageId);
-        stringRedisTemplate.opsForValue().set(allIndexPrefix, JsonUtil.listToJson(allIndexList));
-
-        //2 查询场景包叶子指标
-        List<IndexTemplatePO> leafIndexList = allIndexList.stream().filter(index -> StringUtil.isNotEmpty(index.getRuleId())).collect(Collectors.toList());
-        stringRedisTemplate.opsForValue().set(leafIndexPrefix, JsonUtil.listToJson(leafIndexList));
-        log.info("ProjectService--handlePackage 项目 " + projectId + " 的叶子指标为:" + leafIndexList);
-        List<ScenePO> sceneList = new ArrayList<>();
-        leafIndexList.forEach(leafIndex -> {
-            String naturalIds = leafIndex.getSceneNaturalIds();
-            String standardIds = leafIndex.getSceneStatueIds();
-            String accidentIds = leafIndex.getSceneTrafficIds();
-            String generalizationIds = leafIndex.getSceneGeneralizationIds();
-            if (StringUtil.isNotEmpty(naturalIds)) {
-                List<String> naturalIdList = new ArrayList<>(Arrays.asList(naturalIds.split(",")));
-                sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
-            }
-            if (StringUtil.isNotEmpty(standardIds)) {
-                List<String> standardIdList = new ArrayList<>(Arrays.asList(standardIds.split(",")));
-                sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
-            }
-            if (StringUtil.isNotEmpty(accidentIds)) {
-                List<String> accidentIdList = new ArrayList<>(Arrays.asList(accidentIds.split(",")));
-                sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
-            }
-            if (StringUtil.isNotEmpty(generalizationIds)) {
-                List<String> generalizationIdList = new ArrayList<>(Arrays.asList(generalizationIds.split(",")));
-                sceneList.addAll(sceneMapper.selectGeneralizationByIdList(generalizationIdList));
-            }
-        });
-        log.info("ProjectService--handlePackage 项目" + projectId + " 共有 " + sceneList.size() + " 个任务!");
-        return sceneList;
-    }
-
-
-//    /**
-//     * @param parallelism          并行度
-//     * @param projectRunningPrefix
-//     * @param userId
-//     * @param projectId
-//     * @param projectType
-//     * @param nodeMap              节点
-//     * @param algorithmDockerImage 算法
-//     * @param videoTime            视频长度
-//     * @param scenePOSet
-//     * @param vehiclePO
-//     * @param cameraPOList
-//     * @param ogtPOList
-//     */
-//    @SneakyThrows
-//    public void sendTaskMessage(int parallelism,
-//                                String projectRunningPrefix,
-//                                String userId,
-//                                String projectId,
-//                                String projectType,
-//                                Map<String, Integer> nodeMap,
-//                                String algorithmDockerImage,
-//                                Long videoTime,
-//                                Set<ScenePO> scenePOSet,
-//                                VehiclePO vehiclePO,
-//                                List<CameraPO> cameraPOList,
-//                                List<OgtPO> ogtPOList) {
-//        List<NodeTO> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
-//        final int[] messageNumber = CollectionUtil.createIntArray(0);
-//        ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, parallelism, (short) 1);   // 创建主题
-//        log.info("ProjectService--sendTaskMessage 项目 " + projectId + " 获得的包括的场景信息为:" + scenePOSet);
-//        for (ScenePO scenePO : scenePOSet) {
-//            String sceneId = scenePO.getId();
-//            //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
-//            List<String> lastTargetIdList = null;
-//            if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-//                lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
-//            } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-//                lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
-//            }
-//            lastTargetIdList.forEach(lastTargetId -> {
-//                String taskId = StringUtil.getRandomUUID();
-//                String taskRetryPrefix = projectRunningPrefix + ":task:" + taskId + ":retry";
-//                String taskMessagePrefix = projectRunningPrefix + ":task:" + taskId + ":message";
-//                // 设置任务重试次数为 0,方便任务进行最大3次的重试。
-//                stringRedisTemplate.opsForValue().set(taskRetryPrefix, "0");
-//                // 保存任务信息
-//                TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
-//                        .id(taskId)
-//                        .pId(projectId)
-//                        .sceneId(sceneId)
-//                        .lastTargetId(lastTargetId)
-//                        .sceneName(scenePO.getName())
-//                        .sceneType(scenePO.getType())
-//                        .runState(DictConstants.TASK_PENDING)
-//                        .runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId)
-//                        .build();
-//                taskPO.setCreateTime(TimeUtil.getNowForMysql());
-//                taskPO.setCreateUserId(userId);
-//                taskPO.setModifyTime(TimeUtil.getNowForMysql());
-//                taskPO.setModifyUserId(userId);
-//                taskPO.setModifyTime(TimeUtil.getNowForMysql());
-//                taskPO.setIsDeleted("0");
-//                taskMapper.insert(taskPO);
-//                // 下载 xodr 和 osgb 供仿真后生成 xosc
-//                String scenarioOdr = scenePO.getScenarioOdr();
-//                String scenarioOsgb = scenePO.getScenarioOsgb();
-//                String[] splitXodr = scenarioOdr.split("/");
-//                String xodrName = splitXodr[splitXodr.length - 1];
-//                String[] splitOsgb = scenarioOsgb.split("/");
-//                String osgbName = splitOsgb[splitOsgb.length - 1];
-//                try {
-//                    String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
-//                    String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + ".xodr";
-//                    String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
-//                    String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + ".osgb";
-//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
-//                    MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
-//                    log.info("ProjectService--sendTaskMessage 已经将 xodr 上传到 minio 的结果文件目录:" + xodrPathOfMinio);
-//                    MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
-//                    MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
-//                    log.info("ProjectService--sendTaskMessage 已经将 osgb 上传到 minio 的结果文件目录:" + osgbPathOfMinio);
-//                } catch (IOException | ServerException | InsufficientDataException | ErrorResponseException |
-//                         NoSuchAlgorithmException | InvalidKeyException | InvalidResponseException |
-//                         XmlParserException | InternalException e) {
-//                    throw new RuntimeException(e);
-//                }
-//
-//                // 组装 task 消息
-//                TaskTO taskTO = TaskTO.builder()
-//                        .info(InfoTO.builder()
-//                                .project_id(taskPO.getPId())
-//                                .task_id(taskPO.getId())
-//                                .task_path(taskPO.getRunResultFilePath())
-//                                .default_time(videoTime)
-//                                .build())
-//                        .scenario(ScenarioTO.builder()
-//                                .scenario_osc(scenePO.getScenarioOsc())
-//                                .scenario_odr(scenarioOdr)
-//                                .scenario_osgb(scenarioOsgb)
-//                                .build())
-//                        .vehicle(VehicleTO.builder()
-//                                .model(ModelTO.builder()
-//                                        .model_label(vehiclePO.getModelLabel())
-//                                        .build())
-//                                .dynamics(DynamicsTO.builder()
-//                                        .dynamics_maxspeed(vehiclePO.getMaxSpeed())
-//                                        .dynamics_enginepower(vehiclePO.getEnginePower())
-//                                        .dynamics_maxdecel(vehiclePO.getMaxDeceleration())
-//                                        .dynamics_maxsteering(vehiclePO.getMaxSteeringAngle())
-//                                        .dynamics_mass(vehiclePO.getMass())
-//                                        .dynamics_frontsurfaceeffective(vehiclePO.getFrontSurfaceEffective())
-//                                        .dynamics_airdragcoefficient(vehiclePO.getAirDragCoefficient())
-//                                        .dynamics_rollingresistance(vehiclePO.getRollingResistanceCoefficient())
-//                                        .dynamics_wheeldiameter(vehiclePO.getWheelDiameter())
-//                                        .dynamics_wheeldrive(vehiclePO.getWheelDrive())
-//                                        .dynamics_overallefficiency(vehiclePO.getOverallEfficiency())
-//                                        .dynamics_distfront(vehiclePO.getFrontDistance())
-//                                        .dynamics_distrear(vehiclePO.getRearDistance())
-//                                        .dynamics_distleft(vehiclePO.getLeftDistance())
-//                                        .dynamics_distright(vehiclePO.getRightDistance())
-//                                        .dynamics_distheight(vehiclePO.getHeightDistance())
-//                                        .dynamics_wheelbase(vehiclePO.getWheelbase())
-//                                        .build())
-//                                .sensors(SensorsTO.builder()   // 根据 vehicleId 查询绑定的传感器列表
-//                                        .camera(cameraPOList)
-//                                        .OGT(ogtPOList)
-//                                        .build())
-//                                .build())
-//                        .build();
-//
-//                //4-4 将对象转成 json
-//                String taskJson = "";
-//                try {
-//                    taskJson = JsonUtil.beanToJson(taskTO);
-//                } catch (JsonProcessingException e) {
-//                    e.printStackTrace();
-//                }
-//                //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
-//                String finalTaskJson = taskJson;
-//                stringRedisTemplate.opsForValue().set(taskMessagePrefix, finalTaskJson);
-//                kafkaTemplate.send(projectId, messageNumber[0] % parallelism, "", taskJson).addCallback(success -> {
-//                    // 消息发送到的topic
-//                    String topic = success.getRecordMetadata().topic();
-//                    // 消息发送到的分区
-//                    int partition = success.getRecordMetadata().partition();
-//                    // 消息在分区内的offset
-//                    long offset = success.getRecordMetadata().offset();
-//                    log.info("------- ProjectConsumer 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset + " 消息体为:" + finalTaskJson);
-//                    //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
-//                    // 选一个count 最少的node,如果 count 是 0 则直接启动。
-//                    AtomicReference<String> currentNodeName = new AtomicReference<>("");
-//                    AtomicInteger currentCount = new AtomicInteger(Integer.MAX_VALUE);
-//                    nodeListToCount.forEach(nodeTO -> {
-//                        int tempCount = nodeTO.getCount();
-//                        String tempNodeName = nodeTO.getNodeName();
-//                        if (tempCount < currentCount.get()) {
-//                            currentCount.set(tempCount);
-//                            currentNodeName.set(tempNodeName);
-//                            nodeTO.setCount(tempCount + 1);
-//                        }
-//                    });
-//                    String currentNodeNameValue = currentNodeName.get();
-//                    int currentCountValue = currentCount.get();
-//                    projectManager.createTempYaml(projectId, algorithmDockerImage, currentNodeNameValue, partition, offset, currentCountValue);
-//                }, failure -> log.error("------- 发送消息失败:" + failure.getMessage()));
-//                messageNumber[0] = messageNumber[0] + 1;
-//            });
-//        }
-//        log.info("ProjectService--sendTaskMessage 共发送了 " + messageNumber[0] + " 条消息!");
-//
-//    }
 
 
     /**
@@ -439,13 +225,15 @@ public class ProjectService {
                 if (DictConstants.ALGORITHM_UPLOAD_MODE_FILE.equals(uploadMode)) {
                     algorithmTarLinuxTempPath = linuxTempPath + "algorithm-file/" + algorithmCode + "/" + algorithmCode + ".tar";
                     String minioPath = algorithmPO.getMinioPath();
-                    MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);    // 下载算法文件到本地
+                    log.info("handleAlgorithm() 下载 minio 算法文件 " + minioPath + " 到本地 " + algorithmTarLinuxTempPath);
+                    MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);
                 } else if (DictConstants.ALGORITHM_UPLOAD_MODE_GIT.equals(uploadMode)) {
                     algorithmDirectoryLinuxTempPath = linuxTempPath + "algorithm-git/" + algorithmCode + "/";
                     String gitUrl = algorithmPO.getGitUrl().replace(gitConfiguration.getName(), gitConfiguration.getUrl());
                     String gitUserName = algorithmPO.getGitUserName();
                     String gitPassword = algorithmPO.getGitPassword();
-                    GitUtil.clone(gitUrl, gitUserName, gitPassword, algorithmDirectoryLinuxTempPath, true); // 下载算法文件到目录
+                    log.info("handleAlgorithm() 下载 git 算法文件 " + gitUrl + " 到本地 " + algorithmDirectoryLinuxTempPath);
+                    GitUtil.clone(gitUrl, gitUserName, gitPassword, algorithmDirectoryLinuxTempPath, true);
                     for (String filename : Objects.requireNonNull(new File(algorithmDirectoryLinuxTempPath).list())) {
                         if (filename.endsWith(".tar")) {
                             algorithmTarLinuxTempPath = algorithmDirectoryLinuxTempPath + filename;
@@ -501,8 +289,6 @@ public class ProjectService {
     }
 
 
-
-
     /**
      * @param projectId   手动项目 id 或自动项目子id
      * @param projectType 项目类型

+ 14 - 14
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/MinioUtil.java

@@ -29,13 +29,14 @@ public class MinioUtil {
     public static void rmR(MinioClient minioClient, String bucketName, String prefix) {
         Iterable<Result<Item>> list = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName)
                 .prefix(prefix).recursive(true).build());
-        if (list == null) {
+        if (list == null || !list.iterator().hasNext()) {
             log.info(prefix + " 不存在。");
             return;
         }
         for (Result<Item> object : list) {
             rm(minioClient, bucketName, object.get().objectName());
         }
+        log.info("rmR() 删除 minio 目录 " + bucketName + "/" + prefix + "完成。");
     }
 
     /**
@@ -154,24 +155,23 @@ public class MinioUtil {
     /**
      * 下载文件
      */
-    public static void downloadToFile(
-            MinioClient minioClient,
-            String bucketName,
-            String objectName,
-            String targetFilePath
-    ) throws IOException, ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, io.minio.errors.InternalException {
-
+    @SneakyThrows
+    public static void downloadToFile(MinioClient minioClient, String bucket, String object, String targetFilePath) {
         File file = new File(targetFilePath);
         if (!file.getParentFile().exists()) {
             boolean mkdir = file.getParentFile().mkdirs();
         }
-        minioClient.downloadObject(DownloadObjectArgs.builder()
-                .bucket(bucketName)
-                .object(objectName)
-                .filename(targetFilePath)
-                .build());
+        boolean objectExist = isObjectExist(minioClient, bucket, object);
+        if (objectExist) {
+            minioClient.downloadObject(DownloadObjectArgs.builder()
+                    .bucket(bucket)
+                    .object(object)
+                    .filename(targetFilePath)
+                    .build());
+        } else {
+            throw new RuntimeException("downloadToFile() minio 文件不存在。");
+        }
     }
-
     /**
      * 下载文件
      */