夜得朦胧 1 yıl önce
ebeveyn
işleme
1aaf8a521e

+ 6 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/app/service/ProjectApplicationService.java

@@ -618,8 +618,8 @@ public class ProjectApplicationService {
         for (MultiCreateYamlRet redisKey : yamlList) {
             taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationTaskStatusEnum.RUN_STATUS.getProjectStatus(), redisKey.getTaskId(), 0);
         }
-        // 保存每个机器的并行度
-        projectDomainService.setMultiNodeMapUse(isChoiceGpu, multiNodeMapToUse);
+//         保存每个机器的并行度
+//        projectDomainService.setMultiNodeMapUse(isChoiceGpu, multiNodeMapToUse);
     }
 
     /**
@@ -1119,7 +1119,8 @@ public class ProjectApplicationService {
                 String mapPath = simulationMapVO.getMapPath();
                 String mapOsgbPath = simulationMapVO.getMapOsgbPath();
 
-                String mapMinioPath = mapPath.substring(mapPath.indexOf("/mapFile"), mapPath.indexOf("?"));
+//                String mapMinioPath = mapPath.substring(mapPath.indexOf("/mapFile"), mapPath.indexOf("?"));
+                String mapMinioPath = mapPath;
                 String[] mapDriverSp = mapMinioPath.split("/");
                 String mapDriverSpName = mapDriverSp[mapDriverSp.length - 1];
                 String[] mapDriverNameSp = mapDriverSpName.split("\\.");
@@ -1130,7 +1131,8 @@ public class ProjectApplicationService {
                 MinioUtil.uploadFromFile(minioClient, mapDriverLinuxPath, bucketName, mapDriverPathOfMinio);
                 FileUtil.rm(mapDriverLinuxPath);   // 删除临时文件
 
-                String mapOsgMinioPath = mapOsgbPath.substring(mapOsgbPath.indexOf("/mapFile"), mapOsgbPath.indexOf("?"));
+//                String mapOsgMinioPath = mapOsgbPath.substring(mapOsgbPath.indexOf("/mapFile"), mapOsgbPath.indexOf("?"));
+                String mapOsgMinioPath = mapOsgbPath;
                 String[] mapOsgSp = mapOsgMinioPath.split("/");
                 String mapOsgSpName = mapOsgSp[mapOsgSp.length - 1];
                 String[] mapOsgSpNameSp = mapOsgSpName.split("\\.");

+ 51 - 32
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/app/service/TaskApplicationService.java

@@ -264,11 +264,17 @@ public class TaskApplicationService {
                 log.info("修改任务 {} 的状态为 {} ,pod 名称为 {} ,并删除 pod。", taskId, state, podName);
                 // 已完成
                 // 删除pod
-                KubernetesUtil.deletePod2(apiClient, kubernetesConfiguration.getMultiNamespace(), podName);
+                try {
+                    KubernetesUtil.deletePod2(apiClient, kubernetesConfiguration.getMultiNamespace(), podName);
+                }catch (Exception e){
+                    log.info("删除pod失败,projectId:{},taskId:{}",projectId,taskId, e);
+                }
                 // 并行度加一
-                projectDomainService.incrementOneParallelism(isChoiceGpu, nodeName);
+                projectDomainService.incrementOneMultiParallelism(isChoiceGpu, nodeName);
                 // 释放证书
                 projectDomainService.releaseLicense(projectDomainService.getClusterUserIdByProjectUserId(projectUserId), DictConstants.MODEL_TYPE_VTD, 1);
+                // 删除nodeName相关信息
+                stringRedisTemplate.delete(nodeNameKey);
                 // 执行完成待分析状态
                 if (DictConstants.TASK_ANALYSIS.equals(state)){
                     taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationTaskStatusEnum.NEED_ANALYSIS_STATUS.getProjectStatus(), taskEntity.getId(), 0);
@@ -277,21 +283,31 @@ public class TaskApplicationService {
                     MultiTaskMessageEntity messageEntity = JSONObject.parseObject(taskBody, MultiTaskMessageEntity.class);
                     String taskPath = messageEntity.getInfo().getTask_path();
                     List<String> list = MinioUtil.listAllFileName(minioClient, bucketName, taskPath);
+                    String linuxPath = linuxTempPath + "multiProject/" + projectId + "/" + taskId + "/";
+
                     String csvName = null;
                     String allMp4Url = null;
                     String simulationMp4Url = null;
                     for (String str : list) {
-                        if (StringUtils.contains(str, "csv")) {
-                            csvName = str;
+                        String split = str.substring(str.lastIndexOf("/") + 1);
+                        if (StringUtils.contains(split, ".mp4")) {
+                            if (StringUtils.contains(split, "_custom_camera")){
+//                                allMp4Url = split;
+                                allMp4Url = str;
+                            }else {
+//                                simulationMp4Url = split;
+                                simulationMp4Url = str;
+                            }
+                        }else{
+                            csvName = split;
+                            String linuxFile = linuxPath + split;
+                            String minioPathCsv = taskPath + split;
+                            MinioUtil.downloadToFile(minioClient, bucketName, minioPathCsv, linuxFile);
                         }
                     }
                     // 文件存在
                     if (StringUtils.isNotBlank(csvName)) {
-                        String linuxPath = linuxTempPath + "multiProject/" + projectId + "/" + taskId + "/";
-                        String linuxFile = linuxPath + csvName;
-                        String minioPathCsv = taskPath + csvName;
-                        MinioUtil.downloadToFile(minioClient, bucketName, minioPathCsv, linuxFile);
-                        String pythonCom = "python3 " + multiVtdPodTemplateAnaPy + " --csvFile=\"" + linuxPath + "\"" + " --outputResultFile=\"" + linuxPath + "\"";
+                        String pythonCom = "python3 " + multiVtdPodTemplateAnaPy + " --csvFile=" + linuxPath + "" + " --outputResultFile=" + linuxPath + "result.json";
                         LinuxUtil.execute(pythonCom);
                         Thread.sleep(10000);
                         // 获取到json结果,并插入数据库
@@ -313,26 +329,27 @@ public class TaskApplicationService {
                         multiSimulationProjectResultPO.setId(StringUtil.getRandomUUID()).setSceneId(taskEntity.getSceneId())
                             .setAbnormalTimeDescription(jsonObject.getString("phrases")).setAbnormalType(MultiSimulationResultTypeEnum.LAST_DESCRIPTION.getResultType());
                         multiSimulationResultMapper.insertProjectResult(multiSimulationProjectResultPO);
-                        String urlMp4Pub = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
-                                .method(Method.GET)
-                                .bucket(bucketName)
-                                .object(allMp4Url)
-                                .build())
-                            .replace(minioConfiguration.getEndpointPrivate(), minioConfiguration.getEndpointPublic());
-                        String simUrlMp4Pub = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
-                                .method(Method.GET)
-                                .bucket(bucketName)
-                                .object(simulationMp4Url)
-                                .build())
-                            .replace(minioConfiguration.getEndpointPrivate(), minioConfiguration.getEndpointPublic());
-                        String csvUrlPub = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
-                                .method(Method.GET)
-                                .bucket(bucketName)
-                                .object(minioPathCsv)
-                                .build())
-                            .replace(minioConfiguration.getEndpointPrivate(), minioConfiguration.getEndpointPublic());
-                        multiSimulationSceneMapper.updateSceneResultById(csvUrlPub, urlMp4Pub, simUrlMp4Pub, taskEntity.getSceneId());
 
+//                        String urlMp4Pub = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
+//                                .method(Method.GET)
+//                                .bucket(bucketName)
+//                                .object(allMp4Url)
+//                                .build())
+//                            .replace(minioConfiguration.getEndpointPrivate(), minioConfiguration.getEndpointPublic());
+//                        String simUrlMp4Pub = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
+//                                .method(Method.GET)
+//                                .bucket(bucketName)
+//                                .object(simulationMp4Url)
+//                                .build())
+//                            .replace(minioConfiguration.getEndpointPrivate(), minioConfiguration.getEndpointPublic());
+
+//                        String csvUrlPub = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
+//                                .method(Method.GET)
+//                                .bucket(bucketName)
+//                                .object(minioPathCsv)
+//                                .build())
+//                            .replace(minioConfiguration.getEndpointPrivate(), minioConfiguration.getEndpointPublic());
+                        multiSimulationSceneMapper.updateSceneResultById("", allMp4Url, simulationMp4Url, taskEntity.getSceneId());
                     } else {
                         log.info("taskId:{}未找到csv文件", taskId);
                     }
@@ -344,7 +361,7 @@ public class TaskApplicationService {
                 } else {
                     // 终止
                     log.info("未知的反馈状态类型projectId:{},taskId:{},state:{}", projectId, taskId, state);
-                    return;
+//                    return;
 //                    taskRecordMapper.updateMultiSimulationProjectTaskRecordStatus(MultiSimulationStatusEnum.TERMINATED_STATUS.getProjectStatus(), taskEntity.getId(), 0);
 //                    log.info("taskId:{},项目已经停止", taskId);
                 }
@@ -376,9 +393,11 @@ public class TaskApplicationService {
                 // 删除记录podNamekey
                 RedisUtil.deleteByPrefix(stringRedisTemplate, "multi_project:" + projectId);
             }
-        } catch (io.kubernetes.client.openapi.ApiException apiException) {
-            log.info("POD:" + podName + "已删除。");
-        } catch (Exception e) {
+        }
+//        catch (io.kubernetes.client.openapi.ApiException apiException) {
+//            log.info("POD:" + podName + "已删除。");
+//        }
+        catch (Exception e) {
             log.error("项目 {} 已结束。", projectId, e);
         } finally {
             customRedisClient.unlock(lock1);

+ 32 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/domain/service/ProjectDomainService.java

@@ -1030,6 +1030,38 @@ public class ProjectDomainService {
         }
 
 
+    }
+    public void incrementOneMultiParallelism(String isChoiceGpu, String nodeName) {
+        incrementMultiParallelism(isChoiceGpu, nodeName, 1L);
+    }
+    public void incrementMultiParallelism(String isChoiceGpu, String nodeName, long number) {
+        if (DictConstants.USE_GPU.equals(isChoiceGpu)) {
+            String key = "gpu-node:" + nodeName + ":parallelism";
+            final int currentRestParallelism = Integer.parseInt(customRedisClient.get(key));
+            final List<NodeEntity> nodeList = kubernetesConfiguration.getMultiGpuNodeList();
+            for (NodeEntity node : nodeList) {
+                if (nodeName.equals(node.getHostname())) {
+                    if (currentRestParallelism + 1 <= node.getParallelism()) {
+                        customRedisClient.increment(key, number);
+                    }
+                }
+            }
+            log.info("归还 GPU 节点 {} 的 {} 个并行度。", nodeName, number);
+        } else if (DictConstants.USE_CPU.equals(isChoiceGpu)) {
+            String key = "cpu-node:" + nodeName + ":parallelism";
+            final int currentRestParallelism = Integer.parseInt(customRedisClient.get(key));
+            final List<NodeEntity> nodeList = kubernetesConfiguration.getMultiCpuNodeList();
+            for (NodeEntity node : nodeList) {
+                if (nodeName.equals(node.getHostname())) {
+                    if (currentRestParallelism + 1 <= node.getParallelism()) {
+                        customRedisClient.increment(key, number);
+                    }
+                }
+            }
+            log.info("归还 CPU 节点 {} 的 {} 个并行度。", nodeName, number);
+        }
+
+
     }
 
     public void decrementParallelism(String isChoiceGpu, String nodeName, long number) {

+ 2 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/db/mysql/mapper/MultiSimulationProjectMapper.java

@@ -27,7 +27,8 @@ public interface MultiSimulationProjectMapper {
         "where id = #{projectId}")
     int updateMultiSimulationProjectStatus(MultiSimulationProjectParam param);
 
-    @Select("select id,project_key as projectKey,project_name as projectName,project_status as projectStatus,create_time as createTime,project_description as projectDescription from multi_simulation_project where deleted = 0 " +
+    @Select("select id,project_key as projectKey,project_name as projectName,project_status as projectStatus,create_time as createTime,project_description as projectDescription, " +
+        "project_user_id as projectUserId from multi_simulation_project where deleted = 0 " +
         "and id = #{id} limit 1")
     MultiSimulationProjectVO selectMultiSimulationProjectById(@Param("id") String projectId);
 }