Bläddra i källkod

Merge remote-tracking branch 'origin/20240309-saq-fix' into lcytest

李春阳 1 år sedan
förälder
incheckning
327968ee2c

+ 8 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/app/service/ProjectApplicationService.java

@@ -516,7 +516,9 @@ public class ProjectApplicationService {
                 waitingQueue = JsonUtil.jsonToList(waitingQueueJson, MultiProjectWaitQueueEntity.class);
             }
             boolean contains = false;
-            for (MultiProjectWaitQueueEntity waitQueueEntity : waitingQueue) {
+//            for (MultiProjectWaitQueueEntity waitQueueEntity : waitingQueue) {
+            for (int i = 0; i < waitingQueue.size(); i++) {
+                MultiProjectWaitQueueEntity waitQueueEntity = waitingQueue.get(i);
                 if (waitQueueEntity.getProjectId().equals(multiProjectWaitQueue.getProjectId())) {
                     contains = true;
                     if (multiProjectWaitQueue.getWaitingParallelism() > 0){
@@ -525,7 +527,7 @@ public class ProjectApplicationService {
                         waitQueueEntity.setRunState(multiProjectWaitQueue.getRunState());
                     }else {
                         // 项目等待为0,则删除
-                        waitingQueue.remove(waitQueueEntity);
+                        waitingQueue.remove(i);
                     }
                 }
             }
@@ -578,6 +580,7 @@ public class ProjectApplicationService {
         Map<String, Integer> multiNodeMapToUse = projectDomainService.getMultiNodeMapToUse(isChoiceGpu, parallel);
         List<MultiCreateYamlRet> yamlList = new ArrayList<>();
         for (int i = runState + 1; i < parallel + runState + 1; i++) {
+            log.info("执行第:{}个任务,projectId:{}", i, projectId);
             MultiTaskMessageEntity messageEntity = multiTaskMessageEntityList.get(i);
             String taskId = messageEntity.getInfo().getTask_id();
             // 发送kafka消息
@@ -1232,6 +1235,7 @@ public class ProjectApplicationService {
     @SneakyThrows
     // TODO 此处加锁
     public void checkIfCanRunMulti(MultiProjectWaitQueueEntity projectWaitQueueEntity) {
+        log.info("开始尝试执行任务:{}", projectWaitQueueEntity.getProjectId());
 //        List<MultiTaskMessageEntity> multiTaskMessageEntityList = projectWaitQueueEntity.getMultiTaskMessageEntityList();
         //1 项目信息
         int parallelism = projectWaitQueueEntity.getWaitingParallelism();
@@ -1319,6 +1323,7 @@ public class ProjectApplicationService {
             int runSt = remainderParallelism + runState;
             waitMulti(MultiProjectWaitQueueEntity.builder().waitingParallelism(parallelism - remainderParallelism)
                 .runState(runSt)
+                .projectId(projectWaitQueueEntity.getProjectId())
                 .multiTaskMessageEntityList(projectWaitQueueEntity.getMultiTaskMessageEntityList())
                 .kafkaParamList(projectWaitQueueEntity.getKafkaParamList())
                 .build());
@@ -1335,6 +1340,7 @@ public class ProjectApplicationService {
             // 能执行完也需要删除之前redis key
             waitMulti(MultiProjectWaitQueueEntity.builder().waitingParallelism(0)
                     .runState(projectWaitQueueEntity.getMultiTaskMessageEntityList().size() -1)
+                    .projectId(projectWaitQueueEntity.getProjectId())
                     .kafkaParamList(projectWaitQueueEntity.getKafkaParamList())
                     .multiTaskMessageEntityList(projectWaitQueueEntity.getMultiTaskMessageEntityList()).build()
             );

+ 7 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/app/service/TaskApplicationService.java

@@ -235,7 +235,6 @@ public class TaskApplicationService {
             log.info("多模式仿真收到不存在的任务的状态消息:" + taskId);
             return;
         }
-
         String projectId = taskEntity.getProjectId(); // 项目 id
         String minioUploadPath = projectId + "/" + taskId + "/";
 
@@ -290,7 +289,13 @@ public class TaskApplicationService {
                     String allMp4Url = null;
                     String simulationMp4Url = null;
                     for (String str : list) {
+                        if (StringUtils.contains(str, "/log/") || StringUtils.contains(str, "/result/")){
+                            continue;
+                        }
                         String split = str.substring(str.lastIndexOf("/") + 1);
+                        if (StringUtils.equals(split, ".csv")){
+                            continue;
+                        }
                         if (StringUtils.contains(split, "xml") || StringUtils.contains(split, "xodr") || StringUtils.contains(split, "osgb")){
                             continue;
                         }
@@ -306,6 +311,7 @@ public class TaskApplicationService {
                             csvName = split;
                             String linuxFile = linuxPath + split;
                             String minioPathCsv = taskPath + split;
+                            log.info("下载minio文件:{},path:{}", minioPathCsv, linuxFile);
                             MinioUtil.downloadToFile(minioClient, bucketName, minioPathCsv, linuxFile);
                         }
                     }
@@ -386,7 +392,6 @@ public class TaskApplicationService {
                 multiSimulationProjectParam.setProjectStatus(MultiSimulationStatusEnum.COMPLETED_STATUS.getProjectStatus());
                 multiSimulationProjectMapper.updateMultiSimulationProjectStatus(multiSimulationProjectParam);
                 // 删除kafka topic
-                // todo 暂时不删除
                 KafkaUtil.deleteTopic(kafkaAdminClient, projectId);
                 //6 删除项目 pod 启动文件
                 FileUtil.deleteFileBySubstring(multiPodYamlDirectory, projectId);

+ 9 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/domain/service/ProjectDomainService.java

@@ -8,6 +8,7 @@ import api.common.pojo.vo.project.MultiSimulationSceneCarVO;
 import api.common.util.*;
 import com.alibaba.fastjson.JSONObject;
 import com.css.simulation.resource.scheduler.adapter.entity.ProjectStartMessageEntity;
+import com.css.simulation.resource.scheduler.app.entity.MultiProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.app.entity.ProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.infra.configuration.custom.CustomConfiguration;
 import com.css.simulation.resource.scheduler.infra.configuration.entity.NodeEntity;
@@ -1303,6 +1304,14 @@ public class ProjectDomainService {
             return null;
         }
     }
+    public List<MultiProjectWaitQueueEntity> getMultiWaitQueue() {
+        final String waitQueueJson = customRedisClient.get(DictConstants.MULTI_PROJECT_WAIT_QUEUE_KEY);
+        if (StringUtil.isNotEmpty(waitQueueJson)) {
+            return JsonUtil.jsonToList(waitQueueJson, MultiProjectWaitQueueEntity.class);
+        } else {
+            return null;
+        }
+    }
 
     public void setWaitQueue(List<ProjectWaitQueueEntity> projectWaitQueueEntities) {
         try {

+ 16 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/scheduler/ProjectScheduler.java

@@ -3,6 +3,7 @@ package com.css.simulation.resource.scheduler.infra.scheduler;
 import api.common.pojo.constants.DictConstants;
 import api.common.util.CollectionUtil;
 import com.css.simulation.resource.scheduler.adapter.entity.ProjectStartMessageEntity;
+import com.css.simulation.resource.scheduler.app.entity.MultiProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.app.entity.ProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.app.service.ProjectApplicationService;
 import com.css.simulation.resource.scheduler.domain.service.ProjectDomainService;
@@ -50,4 +51,19 @@ public class ProjectScheduler {
         }
     }
 
+    @Scheduled(fixedDelay = 2 * 60 * 1000)
+    public void dispatchMultiProject() {
+        List<MultiProjectWaitQueueEntity> projectWaitQueue = projectDomainService.getMultiWaitQueue();
+        if (CollectionUtil.isNotEmpty(projectWaitQueue)) {
+            MultiProjectWaitQueueEntity projectWaitQueueEntity = projectWaitQueue.get(0);
+            Integer waitingParallelism = projectWaitQueueEntity.getWaitingParallelism();
+            if (waitingParallelism > 0){
+                projectApplicationService.checkIfCanRunMulti(projectWaitQueueEntity);
+            }else {
+                log.info("无剩余等待任务,删除:{}", projectWaitQueueEntity.getProjectId());
+                projectApplicationService.waitMulti(projectWaitQueueEntity);
+            }
+        }
+    }
+
 }

+ 30 - 4
simulation-resource-server/src/main/java/com/css/simulation/resource/server/app/impl/MultiSimulationProjectServiceImpl.java

@@ -47,6 +47,7 @@ import javax.servlet.http.HttpServletResponse;
 import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URLEncoder;
 import java.util.*;
 import java.util.zip.ZipEntry;
@@ -274,9 +275,11 @@ public class MultiSimulationProjectServiceImpl implements MultiSimulationProject
     public ResponseBodyVO<MultiSimulationProjectVO> submitMultiSimulationProjectDetail(MultiSimulationProjectParam param){
         String currentUserId = AuthUtil.getCurrentUserId();
         param.setProjectUserId(currentUserId);
+        param.setProjectStatus(MultiSimulationStatusEnum.RUN_STATUS.getProjectStatus());
         // 先更新
         multiSimulationProjectMapper.updateMultiSimulationProject(param);
-        Integer status = param.getProjectStatus();
+        MultiSimulationProjectVO projectVO = multiSimulationProjectMapper.selectMultiSimulationProjectById(param.getProjectId());
+        Integer status = projectVO.getProjectStatus();
         if (status != MultiSimulationStatusEnum.INIT_STATUS.getProjectStatus()){
             return new ResponseBodyVO<>(ResponseBodyVO.Response.CLIENT_FAILURE, "当前任务状态不允许执行此操作");
         }
@@ -748,9 +751,12 @@ public class MultiSimulationProjectServiceImpl implements MultiSimulationProject
                 }
             }
         }
+        ResponseBodyVO<String> overUrlRes = fileDownService.getPublicUrl(sceneVO.getProjectResultOverallUrl());
+        ResponseBodyVO<String> simUrlRes = fileDownService.getPublicUrl(sceneVO.getProjectResultSimulationUrl());
+
         MultiSimulationSceneResultVO multiSimulationSceneResultVO = new MultiSimulationSceneResultVO();
-        multiSimulationSceneResultVO.setId(param.getSceneId()).setProjectId(sceneVO.getProjectId()).setProjectResultSimulationUrl(sceneVO.getProjectResultSimulationUrl())
-            .setProjectResultOverallUrl(sceneVO.getProjectResultOverallUrl()).setSceneName(sceneVO.getSceneName())
+        multiSimulationSceneResultVO.setId(param.getSceneId()).setProjectId(sceneVO.getProjectId()).setProjectResultSimulationUrl(simUrlRes.isStatus() ? simUrlRes.getInfo() : "")
+            .setProjectResultOverallUrl(overUrlRes.isStatus() ? overUrlRes.getInfo() : "").setSceneName(sceneVO.getSceneName())
             .setPhrases(phrases)
 //            .setCollisionTimes(collisionDetail.size())
 //            .setAbnormalParkingTimes(abnormalParkingDetail.size()).setOutOfPavementTimes(outOfPavementDetail.size())
@@ -822,6 +828,24 @@ public class MultiSimulationProjectServiceImpl implements MultiSimulationProject
         return new ResponseBodyVO<>(ResponseBodyVO.Response.SUCCESS);
     }
 
+//    private void downloadDependFile(String minioPath, String localPath) throws IOException {
+//         FileDownService fileDownService = ApplicationContextAwareImpl.getApplicationContext().getBean(FileDownService.class);
+//         Response download = fileDownService.download(MinioParameter.builder().objectName(minioPath).build());
+//         Response.Body body = download.body();
+//         InputStream inputStream;
+//         inputStream = body.asInputStream();
+//         File file = new File(localPath);
+//         OutputStream outputStream = Files.newOutputStream(file.toPath());
+//         byte[] buffer = new byte[1024];
+//         int bytesRead;
+//         while ((bytesRead = inputStream.read(buffer)) != -1) {
+//         outputStream.write(buffer, 0, bytesRead);
+//         }
+//         inputStream.close();
+//         outputStream.close();
+//         download.close();
+//    }
+
     @SneakyThrows
     public void exportProjectTaskFileById(MultiSimulationProjectParam param){
         String projectId = param.getProjectId();
@@ -886,11 +910,13 @@ public class MultiSimulationProjectServiceImpl implements MultiSimulationProject
                         Response.Body body = down.body();
                         ZipEntry entry2 = new ZipEntry(zipPath);
                         zos.putNextEntry(entry2);
-                        BufferedInputStream in = new BufferedInputStream(body.asInputStream());
+//                        BufferedInputStream in = new BufferedInputStream(body.asInputStream());
+                        InputStream in = body.asInputStream();
                         while ((len = in.read(buffer)) != -1) {
                             zos.write(buffer, 0, len);
                         }
                         in.close();
+
                     }else if ("Ego.csv".equals(fileName) || "evaluation.csv".equals(fileName)) {
                         MinioParameter minioPar = new MinioParameter();
                         minioPar.setObjectName(s);

+ 1 - 1
simulation-resource-server/src/main/java/com/css/simulation/resource/server/infra/db/mysql/mapper/MultiSimulationProjectTaskRecordMapper.java

@@ -8,7 +8,7 @@ import java.util.List;
 @Mapper
 public interface MultiSimulationProjectTaskRecordMapper {
 
-    @Select("select id,project_id,scene_id,status from multi_simulation_project_task_record where project_id = #{projectId} and deleted = 0")
+    @Select("select id,project_id,scene_id,status,task_body from multi_simulation_project_task_record where project_id = #{projectId} and deleted = 0")
     List<MultiSimulationProjectTaskRecordPO> selectMultiSimulationProjectTaskRecordList(@Param("projectId") String projectId);
 
 }