Forráskód Böngészése

优化项目启动前文件复制

LingxinMeng 11 hónapja
szülő
commit
96110a9617

+ 4 - 6
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/adapter/consumer/ProjectConsumer.java

@@ -27,7 +27,7 @@ public class ProjectConsumer {
      */
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${custom.mq-start-project-topic}")
     public void acceptMessage(ConsumerRecord<String, String> projectStartMessageRecord) {
-        log.info("消费者组 simulation-resource-scheduler 接收到项目开始消息:{}", projectStartMessageRecord);
+        log.info("消费者组 simulation-resource-scheduler 接收到标准化测试项目开始消息:{}", projectStartMessageRecord);
         projectApplicationService.runProject(JsonUtil.jsonToBean(projectStartMessageRecord.value(), ProjectStartMessageEntity.class));
     }
 
@@ -37,7 +37,7 @@ public class ProjectConsumer {
      */
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${custom.mq-stop-project-topic}")
     public void stopProject(ConsumerRecord<String, String> projectStopMessageRecord) {
-        log.info("消费者组 simulation-resource-scheduler 接收到的项目终止消息:{}", projectStopMessageRecord);
+        log.info("消费者组 simulation-resource-scheduler 接收到的标准化测试项目终止消息:{}", projectStopMessageRecord);
         try {
             TimeUnit.SECONDS.sleep(10);
             projectApplicationService.stopProject(JsonUtil.jsonToBean(projectStopMessageRecord.value(), ProjectStopMessageEntity.class));
@@ -48,16 +48,14 @@ public class ProjectConsumer {
 
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${custom.mq-start-multi-project-topic}")
     public void acceptMultiMessage(ConsumerRecord<String, String> projectStartMessageRecord) {
-        log.info("消费者组 simulation-resource-scheduler 接收到项目开始消息:" + projectStartMessageRecord);
+        log.info("消费者组 simulation-resource-scheduler 接收到多模式项目开始消息:{}", projectStartMessageRecord);
         projectApplicationService.runMultiProject(JsonUtil.jsonToBean(projectStartMessageRecord.value(), MultiSimulationProjectKafkaParam.class));
     }
 
 
-    /**
-     */
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${custom.mq-stop-multi-project-topic}")
     public void stopMultiProject(ConsumerRecord<String, String> projectStopMessageRecord) {
-        log.info("消费者组 simulation-resource-scheduler 接收到的项目终止消息:" + projectStopMessageRecord);
+        log.info("消费者组 simulation-resource-scheduler 接收到的多模式项目终止消息:{}", projectStopMessageRecord);
         try {
             TimeUnit.SECONDS.sleep(10);
             projectApplicationService.stopMultiProject(JsonUtil.jsonToBean(projectStopMessageRecord.value(), MultiSimulationProjectKafkaParam.class));

+ 96 - 87
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/app/service/ProjectApplicationService.java

@@ -229,47 +229,56 @@ public class ProjectApplicationService {
                             String xoscName = splitXosc[splitXosc.length - 1];
                             String[] xoscNameSplit = xoscName.split("\\.");
                             String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
-                            String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
+//                            String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
                             String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
-                            MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
-                            MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
-                            FileUtil.rm(xoscPathOfLinux);   // 删除临时文件
+                            {
+                                MinioUtil.copyFile(minioClient, bucketName, scenarioOsc, xoscPathOfMinio);
+//                            MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
+//                            MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
+//                            FileUtil.rm(xoscPathOfLinux);   // 删除临时文件
+                            }
+
 
                             String scenarioOdr = sceneEntity.getScenarioOdr();
                             String[] splitXodr = scenarioOdr.split("/");
                             String xodrName = splitXodr[splitXodr.length - 1];
                             String[] xodrNameSplit = xodrName.split("\\.");
                             String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
-                            String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
+//                            String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
                             String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
-                            MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
-                            MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
-                            FileUtil.rm(xodrPathOfLinux);   // 删除临时文件
+                            {
+                                MinioUtil.copyFile(minioClient, bucketName, scenarioOdr, xodrPathOfMinio);
+//                                MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
+//                                MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
+//                                FileUtil.rm(xodrPathOfLinux);   // 删除临时文件
+                            }
 
                             String scenarioOsgb = sceneEntity.getScenarioOsgb();
                             String[] splitOsgb = scenarioOsgb.split("/");
                             String osgbName = splitOsgb[splitOsgb.length - 1];
                             String[] osgbNameSplit = osgbName.split("\\.");
                             String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
-                            String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
+//                            String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
                             String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
-                            MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
-                            MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
-                            FileUtil.rm(osgbPathOfLinux);   // 删除临时文件
-
+                            {
+                                MinioUtil.copyFile(minioClient, bucketName, scenarioOsgb, osgbPathOfMinio);
+//                                MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
+//                                MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
+//                                FileUtil.rm(osgbPathOfLinux);   // 删除临时文件
+                            }
                             // 组装 task 消息
                             TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleEntity.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(DynamicsEntity.builder().dynamics_maxspeed(vehicleEntity.getMaxSpeed()).dynamics_enginepower(vehicleEntity.getEnginePower()).dynamics_maxdecel(vehicleEntity.getMaxDeceleration()).dynamics_maxsteering(vehicleEntity.getMaxSteeringAngle()).dynamics_mass(vehicleEntity.getMass()).dynamics_frontsurfaceeffective(vehicleEntity.getFrontSurfaceEffective()).dynamics_airdragcoefficient(vehicleEntity.getAirDragCoefficient()).dynamics_rollingresistance(vehicleEntity.getRollingResistanceCoefficient()).dynamics_wheeldiameter(vehicleEntity.getWheelDiameter()).dynamics_wheeldrive(vehicleEntity.getWheelDrive()).dynamics_overallefficiency(vehicleEntity.getOverallEfficiency()).dynamics_distfront(vehicleEntity.getFrontDistance()).dynamics_distrear(vehicleEntity.getRearDistance()).dynamics_distleft(vehicleEntity.getLeftDistance()).dynamics_distright(vehicleEntity.getRightDistance()).dynamics_distheight(vehicleEntity.getHeightDistance()).dynamics_wheelbase(vehicleEntity.getWheelbase()).build()).sensors(SensorsEntity.builder()   // 根据 vehicleId 查询绑定的传感器列表
                                     .camera(cameraEntityList).OGT(ogtEntityList).build()).build()).build();
                             FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
                             taskList.add(taskEntity);
-                            log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
+                            log.info("VTD项目 {} 将任务消息转成 json 保存到临时目录等待资源分配后执行:{}", projectId, taskMessageEntity.getInfo().getTask_id());
                         } catch (Exception e) {
                             log.error("组装数据失败." + e);
                         }
                     }
                 }
                 taskDomainService.batchInsertTask(taskList);
-                log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
+                log.info("VTD项目 {} 共有 {} 个任务,已保存到数据库", projectId, taskList.size());
             } else if (DictConstants.MODEL_TYPE_CARSIM.equals(modelType)) {
 
                 List<TaskEntity> taskList = new ArrayList<>();
@@ -336,14 +345,14 @@ public class ProjectApplicationService {
 
                             FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
                             taskList.add(taskEntity);
-                            log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
+                            log.info("CARSIM项目 {} 将任务消息转成 json 保存到临时目录等待资源分配后执行:{}", projectId, taskMessageEntity.getInfo().getTask_id());
                         } catch (Exception e) {
                             log.error("组装数据失败." + e);
                         }
                     }
                 }
                 taskDomainService.batchInsertTask(taskList);
-                log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
+                log.info("CARSIM项目 {} 共有 {} 个任务,已保存到数据库", projectId, taskList.size());
             }
 
         } catch (Exception e) {
@@ -363,7 +372,7 @@ public class ProjectApplicationService {
         final String waitingType = projectWaitQueueEntity.getWaitingType();
         final Integer waitingParallelism = projectWaitQueueEntity.getWaitingParallelism();
         final ProjectStartMessageEntity projectStartMessageEntity = projectWaitQueueEntity.getProjectStartMessageEntity();
-        log.info("判断用户是否拥有可分配资源:" + projectStartMessageEntity);
+        log.info("判断用户是否拥有可分配资源:{}", projectStartMessageEntity);
         //1 项目信息
         String modelType = projectStartMessageEntity.getModelType();
         String projectId = projectStartMessageEntity.getProjectId();    // 手动执行项目 id 或 自动执行子项目 id
@@ -381,7 +390,7 @@ public class ProjectApplicationService {
         String clusterUserId;  // 项目实际运行使用的用户集群
         if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
             clusterUserId = DictConstants.SYSTEM_USER_ID;
-            log.info("项目 " + projectId + " 的创建人 " + projectUserId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
+            log.info("项目 {} 的创建人 {} 为管理员账户或管理员子账户,直接判断服务器能否执行。", projectId, projectUserId);
             PrefixEntity redisPrefix = projectDomainService.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
             final String projectRunningKey = redisPrefix.getProjectRunningKey();
             if (remainderParallelism <= 0) {
@@ -396,16 +405,16 @@ public class ProjectApplicationService {
         } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
             clusterUserId = projectUserId;
             clusterEntity = clusterMapper.selectByUserId(clusterUserId);
-            log.info("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterEntity);
+            log.info("项目 {} 的创建人 {} 为普通账户(包括独占或共享都在自己的集群),集群为:{}", projectId, projectUserId, clusterEntity);
         } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
             if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) {   //3-3 普通子账户,根据自己的独占节点排队
                 clusterUserId = projectUserId;
                 clusterEntity = clusterMapper.selectByUserId(clusterUserId);
-                log.info("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通独占子账户(自己的集群),集群为:" + clusterEntity);
+                log.info("项目 {} 的创建人 {} 为普通独占子账户(自己的集群),集群为:{}", projectId, projectUserId, clusterEntity);
             } else if (DictConstants.USER_TYPE_PUBLIC.equals(useType)) {    //3-4 共享子账户,根据父账户的共享节点排队
                 clusterUserId = userEntity.getCreateUserId();
                 clusterEntity = clusterMapper.selectByUserId(clusterUserId);
-                log.info("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通共享子账户(父账户的集群),集群为:" + clusterEntity);
+                log.info("项目 {} 的创建人 {} 为普通共享子账户(父账户的集群),集群为:{}", projectId, projectUserId, clusterEntity);
             } else {
                 throw new RuntimeException("用户" + projectUserId + "未知占用类型:" + useType);
             }
@@ -533,11 +542,11 @@ public class ProjectApplicationService {
                 MultiProjectWaitQueueEntity waitQueueEntity = waitingQueue.get(i);
                 if (waitQueueEntity.getProjectId().equals(multiProjectWaitQueue.getProjectId())) {
                     contains = true;
-                    if (multiProjectWaitQueue.getWaitingParallelism() > 0){
+                    if (multiProjectWaitQueue.getWaitingParallelism() > 0) {
                         waitQueueEntity.setWaitingParallelism(multiProjectWaitQueue.getWaitingParallelism());
                         waitQueueEntity.setMultiTaskMessageEntityList(multiProjectWaitQueue.getMultiTaskMessageEntityList());
                         waitQueueEntity.setRunState(multiProjectWaitQueue.getRunState());
-                    }else {
+                    } else {
                         // 项目等待为0,则删除
                         waitingQueue.remove(i);
                     }
@@ -564,7 +573,7 @@ public class ProjectApplicationService {
                 waitingQueue = JsonUtil.jsonToList(waitingQueueJson, MultiProjectWaitQueueEntity.class);
             }
             Iterator<MultiProjectWaitQueueEntity> iterator = waitingQueue.iterator();
-            while (iterator.hasNext()){
+            while (iterator.hasNext()) {
                 MultiProjectWaitQueueEntity next = iterator.next();
                 if (next.getProjectId().equals(projectId)) {
                     iterator.remove();
@@ -590,7 +599,7 @@ public class ProjectApplicationService {
         String projectId = multiProjectWaitQueue.getProjectId();
 
         int parallel = parallelism;
-        if (multiTaskMessageEntityList.size()- multiProjectWaitQueue.getRunState()-1 < parallelism){
+        if (multiTaskMessageEntityList.size() - multiProjectWaitQueue.getRunState() - 1 < parallelism) {
             log.info("出现奇怪情况需要使用的并行度大于剩余任务数量parallelism:{},projectId:{},剩余:{}", parallelism, projectId, JSONObject.toJSONString(multiProjectWaitQueue));
             parallel = multiTaskMessageEntityList.size();
         }
@@ -607,7 +616,7 @@ public class ProjectApplicationService {
 //            SendResult<String, String> stringStringSendResult = kafkaTemplate.send(projectId, i % multiTaskMessageEntityList.size(),
 //                taskId, JSONObject.toJSONString(messageEntity)).get();
             SendResult<String, String> stringStringSendResult = kafkaTemplate.send(projectId, i % multiTaskMessageEntityList.size(),
-                taskId, JsonUtil.beanToJson(messageEntity)).get();
+                    taskId, JsonUtil.beanToJson(messageEntity)).get();
 
             RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
             String topic = recordMetadata.topic();  // 消息发送到的topic
@@ -615,21 +624,21 @@ public class ProjectApplicationService {
             long offset = recordMetadata.offset();  // 消息在分区内的offset
             log.info("多模式仿真任务发送消息成功, 主题 topic 为项目ID:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset);
 //            String sceneId = messageEntity.getInfo().getScene_id();
-            String modelName =null;
-            for (String name: multiNodeMapToUse.keySet()) {
+            String modelName = null;
+            for (String name : multiNodeMapToUse.keySet()) {
                 Integer integer = multiNodeMapToUse.get(name);
-                if (integer > 0){
+                if (integer > 0) {
                     modelName = name;
-                    multiNodeMapToUse.put(name, integer -1);
+                    multiNodeMapToUse.put(name, integer - 1);
                 }
             }
-            if (modelName == null){
+            if (modelName == null) {
                 throw new RuntimeException("未选取到可用的节点");
             }
             MultiSimulationSceneKafkaParam multiSimulationSceneKafkaParam = multiProjectWaitQueue.getKafkaParamList().get(i);
             MultiCreateYamlRet multiTempYaml = projectDomainService.createMultiTempYaml(projectId, multiSimulationSceneKafkaParam, messageEntity
-                ,multiProjectWaitQueue.getConnectorPath(),multiProjectWaitQueue.getControllerPath(),multiProjectWaitQueue.getSoftwarePath()
-                , modelName, partition, offset, isChoiceGpu);
+                    , multiProjectWaitQueue.getConnectorPath(), multiProjectWaitQueue.getControllerPath(), multiProjectWaitQueue.getSoftwarePath()
+                    , modelName, partition, offset, isChoiceGpu);
             multiTempYaml.setTaskId(messageEntity.getInfo().getTask_id());
             multiTempYaml.setNodeName(modelName);
             yamlList.add(multiTempYaml);
@@ -1061,6 +1070,7 @@ public class ProjectApplicationService {
         MultiProjectWaitQueueEntity multiTaskAndFixData = createMultiTaskAndFixData(projectStartMessageEntity);
         checkIfCanRunMulti(multiTaskAndFixData);
     }
+
     @SneakyThrows
 //    @Transactional
     public void stopMultiProject(MultiSimulationProjectKafkaParam projectKafkaParam) {
@@ -1087,10 +1097,10 @@ public class ProjectApplicationService {
         // 查看mysql项目是否已经运行
         // 如果不为空,则代表已经执行,为空则代表kafka还未被消费
         List<MultiSimulationProjectTaskRecordPO> recordPOList = taskRecordMapper.selectMultiSimulationProjectTaskRecordList(projectId);
-        if (!CollectionUtils.isEmpty(recordPOList)){
-            for (MultiSimulationProjectTaskRecordPO po: recordPOList) {
+        if (!CollectionUtils.isEmpty(recordPOList)) {
+            for (MultiSimulationProjectTaskRecordPO po : recordPOList) {
                 Integer recordStatus = po.getStatus();
-                if (recordStatus == MultiSimulationTaskStatusEnum.RUN_STATUS.getProjectStatus()){
+                if (recordStatus == MultiSimulationTaskStatusEnum.RUN_STATUS.getProjectStatus()) {
                     taskRecordMapper.updateMultiSimulationProjectTaskRecordStatusAndEnd(MultiSimulationTaskStatusEnum.PERSON_CANCEL_STATUS.getProjectStatus(), po.getId(), 0, System.currentTimeMillis());
                     // 删除pod
                     String nodeNameKey = "multi-taskId:" + po.getId();
@@ -1109,13 +1119,13 @@ public class ProjectApplicationService {
                 } else if (recordStatus == MultiSimulationTaskStatusEnum.INIT_STATUS.getProjectStatus()) {
                     // 初始状态时
                     taskRecordMapper.updateMultiSimulationProjectTaskRecordStatusAndEnd(MultiSimulationTaskStatusEnum.PERSON_CANCEL_STATUS.getProjectStatus(), po.getId(), 0, System.currentTimeMillis());
-                }else if (recordStatus == MultiSimulationTaskStatusEnum.NEED_ANALYSIS_STATUS.getProjectStatus() || recordStatus == MultiSimulationTaskStatusEnum.AUTO_TERMINATED_STATUS.getProjectStatus()) {
+                } else if (recordStatus == MultiSimulationTaskStatusEnum.NEED_ANALYSIS_STATUS.getProjectStatus() || recordStatus == MultiSimulationTaskStatusEnum.AUTO_TERMINATED_STATUS.getProjectStatus()) {
                     // 不作处理了
                 }
             }
             log.info("开始设置整个仿真项目:{}", projectId);
             // 设置整个仿真项目
-            if (status == MultiSimulationTaskStatusEnum.RUN_STATUS.getProjectStatus()){
+            if (status == MultiSimulationTaskStatusEnum.RUN_STATUS.getProjectStatus()) {
                 multiSimulationProjectParam.setProjectStatus(MultiSimulationStatusEnum.TERMINATED_STATUS.getProjectStatus());
                 multiSimulationProjectMapper.updateMultiSimulationProjectStatus(multiSimulationProjectParam);
             }
@@ -1147,22 +1157,22 @@ public class ProjectApplicationService {
             List<MultiSimulationSceneKafkaParam> kafkaParamList = projectStartMessageEntity.getKafkaParamList();
 
             MultiSimulationProjectVO projectVO = multiSimulationProjectMapper.selectMultiSimulationProjectById(projectId);
-            if (Objects.isNull(projectVO)){
+            if (Objects.isNull(projectVO)) {
                 throw new RuntimeException("未找到有效的仿真任务");
             }
             String simulationMageGroupId = projectVO.getSimulationMageGroupId();
             SimulationMageGroupPO groupPO = mageGroupMapper.selectSimulationMageGroupById(simulationMageGroupId);
-            if (Objects.isNull(groupPO) || StringUtils.isBlank(groupPO.getControllerPath()) || StringUtils.isBlank(groupPO.getConnectorPath())){
+            if (Objects.isNull(groupPO) || StringUtils.isBlank(groupPO.getControllerPath()) || StringUtils.isBlank(groupPO.getConnectorPath())) {
                 throw new RuntimeException("仿真镜像组无效");
             }
             List<MultiTaskMessageEntity> entityList = new ArrayList<>();
-            for (MultiSimulationSceneKafkaParam kafkaParam: kafkaParamList) {
+            for (MultiSimulationSceneKafkaParam kafkaParam : kafkaParamList) {
                 String taskId = StringUtil.getRandomUUID();
                 String mapId = kafkaParam.getMapId();
                 String sceneId = kafkaParam.getId();
                 String minioUploadPath = "multiResult/" + projectId + "/" + taskId + "/";
                 SimulationMapVO simulationMapVO = mapMapper.selectMapByMapId(mapId);
-                if (Objects.isNull(simulationMapVO)){
+                if (Objects.isNull(simulationMapVO)) {
                     throw new RuntimeException("地图" + mapId + "不存在");
                 }
                 String mapPath = simulationMapVO.getMapPath();
@@ -1198,11 +1208,11 @@ public class ProjectApplicationService {
                 JSONArray carArray = new JSONArray();
 
                 List<MultiSimulationSceneCarVO> simulationSceneCarVOList = kafkaParam.getSimulationSceneCarVOList();
-                if (CollectionUtils.isEmpty(simulationSceneCarVOList)){
+                if (CollectionUtils.isEmpty(simulationSceneCarVOList)) {
                     throw new RuntimeException("未配置车辆");
                 }
                 VehicleEntity vehicle = null;
-                for (MultiSimulationSceneCarVO sceneCar: simulationSceneCarVOList) {
+                for (MultiSimulationSceneCarVO sceneCar : simulationSceneCarVOList) {
                     String sceneCarId = sceneCar.getId();
                     // 处理算法id
                     String algorithmId = sceneCar.getAlgorithmId();
@@ -1212,16 +1222,16 @@ public class ProjectApplicationService {
 //                    result.put("sceneCarId-" + sceneCarId + "-docker-image", algorithmDockerImage);
                     customRedisClient.set(projectDomainService.getMultiAlgorithmIdRedisKey(algorithmId, projectId), algorithmDockerImage);
                     // 处理车辆,一个场景的车辆是一样的,只需取第一个车辆即可
-                    if (vehicle == null){
+                    if (vehicle == null) {
                         String carId = sceneCar.getCarId();
                         //1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
                         com.css.simulation.resource.scheduler.infra.entity.VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(carId);   // 车辆
                         List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(carId);    // 摄像头
                         List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(carId); // 完美传感器
                         vehicle = VehicleEntity.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build())
-                            .dynamics(DynamicsEntity.builder().dynamics_maxspeed(vehicleEntity.getMaxSpeed()).dynamics_enginepower(vehicleEntity.getEnginePower()).dynamics_maxdecel(vehicleEntity.getMaxDeceleration()).dynamics_maxsteering(vehicleEntity.getMaxSteeringAngle()).dynamics_mass(vehicleEntity.getMass()).dynamics_frontsurfaceeffective(vehicleEntity.getFrontSurfaceEffective()).dynamics_airdragcoefficient(vehicleEntity.getAirDragCoefficient()).dynamics_rollingresistance(vehicleEntity.getRollingResistanceCoefficient()).dynamics_wheeldiameter(vehicleEntity.getWheelDiameter()).dynamics_wheeldrive(vehicleEntity.getWheelDrive()).dynamics_overallefficiency(vehicleEntity.getOverallEfficiency()).dynamics_distfront(vehicleEntity.getFrontDistance()).dynamics_distrear(vehicleEntity.getRearDistance()).dynamics_distleft(vehicleEntity.getLeftDistance()).dynamics_distright(vehicleEntity.getRightDistance()).dynamics_distheight(vehicleEntity.getHeightDistance()).dynamics_wheelbase(vehicleEntity.getWheelbase()).build())
-                            .sensors(SensorsEntity.builder()   // 根据 vehicleId 查询绑定的传感器列表
-                                .camera(cameraEntityList).OGT(ogtEntityList).build()).build();
+                                .dynamics(DynamicsEntity.builder().dynamics_maxspeed(vehicleEntity.getMaxSpeed()).dynamics_enginepower(vehicleEntity.getEnginePower()).dynamics_maxdecel(vehicleEntity.getMaxDeceleration()).dynamics_maxsteering(vehicleEntity.getMaxSteeringAngle()).dynamics_mass(vehicleEntity.getMass()).dynamics_frontsurfaceeffective(vehicleEntity.getFrontSurfaceEffective()).dynamics_airdragcoefficient(vehicleEntity.getAirDragCoefficient()).dynamics_rollingresistance(vehicleEntity.getRollingResistanceCoefficient()).dynamics_wheeldiameter(vehicleEntity.getWheelDiameter()).dynamics_wheeldrive(vehicleEntity.getWheelDrive()).dynamics_overallefficiency(vehicleEntity.getOverallEfficiency()).dynamics_distfront(vehicleEntity.getFrontDistance()).dynamics_distrear(vehicleEntity.getRearDistance()).dynamics_distleft(vehicleEntity.getLeftDistance()).dynamics_distright(vehicleEntity.getRightDistance()).dynamics_distheight(vehicleEntity.getHeightDistance()).dynamics_wheelbase(vehicleEntity.getWheelbase()).build())
+                                .sensors(SensorsEntity.builder()   // 根据 vehicleId 查询绑定的传感器列表
+                                        .camera(cameraEntityList).OGT(ogtEntityList).build()).build();
                     }
                     String pathStart = sceneCar.getPathStart();
                     JSONObject jsonObject = JSONObject.parseObject(pathStart);
@@ -1241,30 +1251,30 @@ public class ProjectApplicationService {
                 FileUtil.rm(mapJsonLinuxPath);
 
                 MultiTaskMessageEntity build = MultiTaskMessageEntity.builder().info(MultiInfoEntity.builder().project_id(projectId).task_id(taskId).scene_id(sceneId)
-                        .default_time(Long.valueOf(projectStartMessageEntity.getDefaultTime()))
-                        .task_path(projectResultPathOfMinio + minioUploadPath).build())
-                    .scenario(ScenarioEntity.builder().scenario_osc(mapXmlPathOfMinio).scenario_odr(mapDriverPathOfMinio).scenario_osgb(mapOsgPathOfMinio).build())
-                    .vehicle(vehicle)
-                    .build();
+                                .default_time(Long.valueOf(projectStartMessageEntity.getDefaultTime()))
+                                .task_path(projectResultPathOfMinio + minioUploadPath).build())
+                        .scenario(ScenarioEntity.builder().scenario_osc(mapXmlPathOfMinio).scenario_odr(mapDriverPathOfMinio).scenario_osgb(mapOsgPathOfMinio).build())
+                        .vehicle(vehicle)
+                        .build();
                 entityList.add(build);
             }
             int sort = 0;
-            for (MultiTaskMessageEntity entity: entityList) {
+            for (MultiTaskMessageEntity entity : entityList) {
                 int i = taskRecordMapper.addMultiSimulationProjectTaskRecord(entity.getInfo().getTask_id(), entity.getInfo().getScene_id(), entity.getInfo().getProject_id(),
-                    JSONObject.toJSONString(entity), sort);
-                sort ++;
+                        JSONObject.toJSONString(entity), sort);
+                sort++;
             }
             log.info("project:{},共插入{}条数据", projectId, entityList.size());
             MultiProjectWaitQueueEntity build = MultiProjectWaitQueueEntity.builder()
-                .multiTaskMessageEntityList(entityList)
-                .projectId(projectId)
-                .connectorPath(groupPO.getConnectorPath())
-                .controllerPath(groupPO.getControllerPath())
-                .softwarePath(groupPO.getSoftwarePath())
-                .waitingParallelism(entityList.size())
-                .kafkaParamList(kafkaParamList)
-                .runState(-1)
-                .build();
+                    .multiTaskMessageEntityList(entityList)
+                    .projectId(projectId)
+                    .connectorPath(groupPO.getConnectorPath())
+                    .controllerPath(groupPO.getControllerPath())
+                    .softwarePath(groupPO.getSoftwarePath())
+                    .waitingParallelism(entityList.size())
+                    .kafkaParamList(kafkaParamList)
+                    .runState(-1)
+                    .build();
             log.info("MultiProjectWaitQueueEntity返回结果:{}", JSONObject.toJSONString(build));
             return build;
         } catch (Exception e) {
@@ -1275,7 +1285,6 @@ public class ProjectApplicationService {
     }
 
 
-
     @SneakyThrows
     // TODO 此处加锁
     public void checkIfCanRunMulti(MultiProjectWaitQueueEntity projectWaitQueueEntity) {
@@ -1283,19 +1292,19 @@ public class ProjectApplicationService {
 //        List<MultiTaskMessageEntity> multiTaskMessageEntityList = projectWaitQueueEntity.getMultiTaskMessageEntityList();
         //1 项目信息
         int parallelism = projectWaitQueueEntity.getWaitingParallelism();
-        if (parallelism <=0){
+        if (parallelism <= 0) {
             log.info("需要只需的项目并行度为0");
         }
         String isChoiceGpu = DictConstants.USE_GPU;
         UserEntity userEntity = null;
         try {
             userEntity = projectDomainService.getUserEntityByMultiProjectId(projectWaitQueueEntity.getProjectId());
-            if (userEntity == null){
+            if (userEntity == null) {
                 projectWaitQueueEntity.setWaitingParallelism(0);
                 log.info("多模式仿真任务未查询到创建人,取消执行该任务:{}", projectWaitQueueEntity.getProjectId());
                 waitMulti(projectWaitQueueEntity);
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             log.info("多模式仿真处理失败,查询用户异常", e);
             return;
         }
@@ -1326,7 +1335,7 @@ public class ProjectApplicationService {
             throw new RuntimeException("未知角色类型:" + roleCode);
         }
         int remainderSimulationLicense = Integer.MAX_VALUE;
-        if (!clusterUserId.equals(DictConstants.SYSTEM_USER_ID)){
+        if (!clusterUserId.equals(DictConstants.SYSTEM_USER_ID)) {
             // 获取仿真软件证书数量和动力学软件证书数量(vtd占一个仿真证书,carsim各占一个)
             Integer usingSimulationLicenseNumber = projectDomainService.getUsingLicenseNumber(clusterUserId, DictConstants.LICENSE_TYPE_SIMULATION);
             Integer numSimulationLicense = clusterEntity.getNumSimulationLicense();
@@ -1340,10 +1349,10 @@ public class ProjectApplicationService {
         int remainderParallelism = projectDomainService.getRemainderMultiParallelism(isChoiceGpu);
         log.info("计算出剩余可执行的并行度:{}", remainderParallelism);
         boolean needWait = false;
-        if (DictConstants.SYSTEM_USER_ID.equals(clusterUserId)){
+        if (DictConstants.SYSTEM_USER_ID.equals(clusterUserId)) {
             // 不需要判断证书
-        } else{
-            if (remainderSimulationLicense <= 0){
+        } else {
+            if (remainderSimulationLicense <= 0) {
                 log.info("multiProjectId:{},仿真数量不够用clusterUserId:{}", projectWaitQueueEntity.getProjectId(), clusterUserId);
                 needWait = true;
             }
@@ -1363,11 +1372,11 @@ public class ProjectApplicationService {
             waitMulti(projectWaitQueueEntity);
         } else if (remainderParallelism < parallelism) {
             // 初始执行
-            if (projectWaitQueueEntity.getRunState() <0){
+            if (projectWaitQueueEntity.getRunState() < 0) {
                 log.info("多模式仿真初始执行,创建kafka topic:{}", projectWaitQueueEntity.getProjectId());
                 KafkaUtil.createTopic(kafkaAdminClient, projectWaitQueueEntity.getProjectId(), projectWaitQueueEntity.getMultiTaskMessageEntityList().size(), (short) 1);   // 创建主题
                 TimeUnit.SECONDS.sleep(5);
-                if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)){
+                if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)) {
                     projectDomainService.useLicense(clusterUserId, 1);
                 }
             }
@@ -1377,27 +1386,27 @@ public class ProjectApplicationService {
             Integer runState = projectWaitQueueEntity.getRunState();
             int runSt = remainderParallelism + runState;
             waitMulti(MultiProjectWaitQueueEntity.builder().waitingParallelism(parallelism - remainderParallelism)
-                .runState(runSt)
-                .projectId(projectWaitQueueEntity.getProjectId())
-                .softwarePath(projectWaitQueueEntity.getSoftwarePath())
-                .connectorPath(projectWaitQueueEntity.getConnectorPath())
-                .controllerPath(projectWaitQueueEntity.getControllerPath())
-                .multiTaskMessageEntityList(projectWaitQueueEntity.getMultiTaskMessageEntityList())
-                .kafkaParamList(projectWaitQueueEntity.getKafkaParamList())
-                .build());
+                    .runState(runSt)
+                    .projectId(projectWaitQueueEntity.getProjectId())
+                    .softwarePath(projectWaitQueueEntity.getSoftwarePath())
+                    .connectorPath(projectWaitQueueEntity.getConnectorPath())
+                    .controllerPath(projectWaitQueueEntity.getControllerPath())
+                    .multiTaskMessageEntityList(projectWaitQueueEntity.getMultiTaskMessageEntityList())
+                    .kafkaParamList(projectWaitQueueEntity.getKafkaParamList())
+                    .build());
         } else {
-            if (projectWaitQueueEntity.getRunState() <0){
+            if (projectWaitQueueEntity.getRunState() < 0) {
                 log.info("多模式仿真初始执行,剩余并行度够用,创建kafka topic:{}", projectWaitQueueEntity.getProjectId());
                 KafkaUtil.createTopic(kafkaAdminClient, projectWaitQueueEntity.getProjectId(), projectWaitQueueEntity.getMultiTaskMessageEntityList().size(), (short) 1);   // 创建主题
                 TimeUnit.SECONDS.sleep(5);
-                if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)){
+                if (!DictConstants.SYSTEM_USER_ID.equals(clusterUserId)) {
                     projectDomainService.useLicense(clusterUserId, 1);
                 }
             }
             runMulti(parallelism, projectWaitQueueEntity, isChoiceGpu);
             // 能执行完也需要删除之前redis key
             waitMulti(MultiProjectWaitQueueEntity.builder().waitingParallelism(0)
-                    .runState(projectWaitQueueEntity.getMultiTaskMessageEntityList().size() -1)
+                    .runState(projectWaitQueueEntity.getMultiTaskMessageEntityList().size() - 1)
                     .projectId(projectWaitQueueEntity.getProjectId())
                     .softwarePath(projectWaitQueueEntity.getSoftwarePath())
                     .connectorPath(projectWaitQueueEntity.getConnectorPath())

+ 20 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/fs/minio/MinioUtil.java

@@ -195,6 +195,26 @@ public class MinioUtil {
         }
     }
 
+    public static void copyFile(MinioClient minioClient, String bucket, String sourceObject, String targetObject) {
+        try {
+            boolean objectExist = isObjectExist(minioClient, bucket, sourceObject);
+            if (objectExist) {
+                minioClient.copyObject(CopyObjectArgs.builder()
+                        .bucket(bucket)
+                        .object(targetObject)
+                        .source(CopySource.builder()
+                                .bucket(bucket)
+                                .object(sourceObject)
+                                .build())
+                        .build());
+            } else {
+                throw new RuntimeException("copyFile() Minio 文件 " + sourceObject + " 不存在。");
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("复制文件失败:" + e.getMessage(), e);
+        }
+    }
+
     /**
      * 下载文件
      */