martin 3 年之前
父節點
當前提交
cbfdef90a5

+ 9 - 0
simulation-resource-common/src/main/java/com/css/simulation/resource/common/controller/MinioController.java

@@ -106,6 +106,15 @@ public class MinioController {
         return new ResponseBodyVO<>(ResponseBodyVO.Response.SUCCESS, objects);
     }
 
+    @PostMapping("/listDeepOne")
+    public ResponseBodyVO<List<String>> listDeepOne(
+            @RequestBody @Validated MinioParameter minioParameter
+    ) throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
+
+        List<String> objects = MinioUtil.listObjectsUnRecursive(minioClientPrivate, bucketName, minioParameter.getObjectName());
+        return new ResponseBodyVO<>(ResponseBodyVO.Response.SUCCESS, objects);
+    }
+
     @PostMapping("/download")
     public void download(
             @RequestBody @Validated MinioParameter minioParameter,

+ 35 - 0
simulation-resource-common/src/main/java/com/css/simulation/resource/common/util/MinioUtil.java

@@ -22,6 +22,41 @@ import java.util.List;
 
 public class MinioUtil {
 
+
+    /**
+     * 获取下一级文件列表
+     *
+     * @param minioClient minio 客户端
+     * @param bucket      bucket 名称
+     * @param prefix      目录名
+     * @return 文件路径列表
+     */
+    public static List<String> listObjectsUnRecursive(MinioClient minioClient, String bucket, String prefix) throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, io.minio.errors.InternalException {
+
+        if (!prefix.endsWith("/")) {
+            prefix = prefix + "/";
+        }
+        int length = prefix.length();
+        Iterable<Result<Item>> objects = minioClient.listObjects(
+                ListObjectsArgs.builder()
+                        .bucket(bucket)
+                        .recursive(true)
+                        .prefix(prefix)
+                        .build()
+        );
+        List<String> result = new ArrayList<>();
+        for (Result<Item> next : objects) {
+            Item item = next.get();
+            String objectName = item.objectName();
+            String substring = objectName.substring(length);
+            String[] split = substring.split("/");
+            result.add(prefix + split[0]);
+        }
+
+        return result;
+    }
+
+
     /**
      * 判断 bucket 是否存在
      */

+ 33 - 48
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -109,13 +109,15 @@ public class ManualProjectConsumer {
         // 根据 packageId,拿到场景 id,然后获取每个场景的文件地址。
         String packageId = projectMessageDTO.getScenePackageId();
         List<IndexTemplatePO> leafIndexList = indexTemplateMapper.selectLeafIndexByPackageId(packageId);
-        List<String> naturalIdList = new ArrayList<>();
-        List<String> standardIdList = new ArrayList<>();
-        List<String> accidentIdList = new ArrayList<>();
-        for (IndexTemplatePO indexTemplatePO : leafIndexList) {
-            String naturalIds = indexTemplatePO.getSceneNaturalIds();
-            String standardIds = indexTemplatePO.getSceneStatueIds();
-            String accidentIds = indexTemplatePO.getSceneTrafficIds();
+        log.info("------- ManualProjectConsumer 项目 " + projectId + " 的叶子指标为:" + leafIndexList);
+        List<ScenePO> sceneList = new ArrayList<>();
+        leafIndexList.forEach(leafIndex -> {
+            List<String> naturalIdList = new ArrayList<>();
+            List<String> standardIdList = new ArrayList<>();
+            List<String> accidentIdList = new ArrayList<>();
+            String naturalIds = leafIndex.getSceneNaturalIds();
+            String standardIds = leafIndex.getSceneStatueIds();
+            String accidentIds = leafIndex.getSceneTrafficIds();
             if (StringUtil.isNotEmpty(naturalIds)) {
                 String[] naturalIdArray = naturalIds.split(",");
                 naturalIdList.addAll(Arrays.asList(naturalIdArray));
@@ -128,17 +130,17 @@ public class ManualProjectConsumer {
                 String[] accidentIdArray = accidentIds.split(",");
                 accidentIdList.addAll(Arrays.asList(accidentIdArray));
             }
-        }
-        List<ScenePO> sceneList = new ArrayList<>();
-        if (CollectionUtil.isNotEmpty(naturalIdList)) {
-            sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
-        }
-        if (CollectionUtil.isNotEmpty(standardIdList)) {
-            sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
-        }
-        if (CollectionUtil.isNotEmpty(accidentIdList)) {
-            sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
-        }
+            if (CollectionUtil.isNotEmpty(naturalIdList)) {
+                sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
+            }
+            if (CollectionUtil.isNotEmpty(standardIdList)) {
+                sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
+            }
+            if (CollectionUtil.isNotEmpty(accidentIdList)) {
+                sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
+            }
+        });
+
         int taskNumber = sceneList.size();
         log.info("------- ManualProjectConsumer 共有 " + taskNumber + " 个任务!");
         projectMapper.updateTaskNumber(projectId, taskNumber);
@@ -255,28 +257,11 @@ public class ManualProjectConsumer {
                     });
                 });
 
-            }else {// 重新执行任务
-                taskIdList.forEach(taskId->{
+            } else {// 重新执行任务
+                taskIdList.forEach(taskId -> {
                     String resultPath = resultPathMinio + projectId + "/" + taskId;
                     String lastTargetId = taskMapper.selectLastTargetIdById(taskId);
-                    // 保存任务信息
-                    TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
-                            .id(taskId)
-                            .pId(projectId)
-                            .sceneId(sceneId)
-                            .lastTargetId(lastTargetId)
-                            .sceneName(scenePO.getName())
-                            .sceneType(scenePO.getType())
-                            .runState(DictConstants.TASK_PENDING)
-                            .runResultFilePath(resultPath)
-                            .build();
-                    taskPO.setCreateTime(TimeUtil.getNowForMysql());
-                    taskPO.setCreateUserId(USER_ID);
-                    taskPO.setModifyTime(TimeUtil.getNowForMysql());
-                    taskPO.setModifyUserId(USER_ID);
-                    taskPO.setModifyTime(TimeUtil.getNowForMysql());
-                    taskPO.setIsDeleted("0");
-                    taskMapper.insert(taskPO);
+                    taskMapper.updateStateById(DictConstants.TASK_PENDING, taskId);
                     // 心跳信息存在緩存中
                     redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId, TimeUtil.getNowString());
                     // 组装 task 消息
@@ -409,17 +394,16 @@ public class ManualProjectConsumer {
             LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
         } else {
             String minioPath = algorithmPO.getMinioPath();
-
             if ("0".equals(algorithmPO.getDockerImport())) {
-
                 // 下载算法文件到本地( 2 到仓库服务器)
                 MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);
                 //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
                 LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
+                algorithmMapper.updateDockerImportAndDockerImageById("1", dockerImage, algorithmId);
             } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
                 dockerImage = algorithmPO.getDockerImage();
             } else {
-                throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
+                throw new RuntimeException("算法 " + algorithmId + " 的 mysql 数据有误!");
             }
         }
 
@@ -435,13 +419,14 @@ public class ManualProjectConsumer {
         String replace0 = yamlSource.replace("job-cloud-simulation", "project-" + projectId);
         String replace1 = replace0.replace("vtd-container", "vtd-" + projectId);
         String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
-        String replace3 = replace2.replace("projectId", projectId);
-        String replace4 = replace3.replace("completions-number", completions + "");
-        String replace5 = replace4.replace("parallelism-number", parallelism + "");
-        String replace6 = replace5.replace("apiVers1on", "apiVersion");
-        String replace7 = replace6.replace("1atch/v1", "batch/v1");
-        log.info("------- ManualProjectConsumer 开始执行 yaml 文件" + replace7);
-        FileUtil.writeStringToLocalFile(replace7, jobTemplateYamlPathTarget);
+        String replace3 = replace1.replace("algorithm-image", dockerImage);
+        String replace4 = replace3.replace("projectId", projectId);
+        String replace5 = replace4.replace("completions-number", completions + "");
+        String replace6 = replace5.replace("parallelism-number", parallelism + "");
+        String replace7 = replace6.replace("apiVers1on", "apiVersion");
+        String replace8 = replace7.replace("1atch/v1", "batch/v1");
+        log.info("------- ManualProjectConsumer 开始执行 yaml 文件" + replace8);
+        FileUtil.writeStringToLocalFile(replace8, jobTemplateYamlPathTarget);
         LinuxUtil.execute("kubectl apply -f " + jobTemplateYamlPathTarget);
     }
 

+ 5 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/AlgorithmMapper.java

@@ -28,4 +28,9 @@ public interface AlgorithmMapper {
             "  and id = #{id}")
     AlgorithmPO selectById(@Param("id") String id);
 
+    @Update("update algorithm\n" +
+            "set docker_import = #{dockerImport},\n" +
+            "    docker_image  = #{dockerImage}\n" +
+            "where id = #{algorithmId}")
+    void updateDockerImportAndDockerImageById(@Param("dockerImport") String dockerImport, @Param("dockerImage") String dockerImage, @Param("algorithmId") String algorithmId);
 }

+ 6 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskMapper.java

@@ -103,4 +103,10 @@ public interface TaskMapper {
             "set run_state = #{runState}\n" +
             "where p_id = #{projectId}")
     void updateStateByProjectId(@Param("projectId") String projectId, @Param("runState") String runState);
+
+    @Update("update simulation_manual_project_task\n" +
+            "set run_state = #{runState}\n" +
+            "where id = #{taskId}")
+    void updateStateById(@Param("runState") String runState,@Param("taskId") String taskId);
+
 }

+ 1 - 1
simulation-resource-scheduler/src/main/resources/kubernetes/template/job/job-template.yaml

@@ -36,7 +36,7 @@ spec:
           securityContext:
             privileged: true
         - name: algorithm-container
-          image: aeb.ros:latest
+          image: algorithm-image
           imagePullPolicy: Never
           command: [ "/bin/sh", "-c", "/AEB/start_docker.sh; touch /tmp/hello.txt;while true;do /bin/echo $(date +%T) >> /tmp/hello.txt; sleep 5; done;" ]
       restartPolicy: Never