夜得朦胧 1 år sedan
förälder
incheckning
9e66bee772

+ 4 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/app/service/ProjectApplicationService.java

@@ -578,6 +578,7 @@ public class ProjectApplicationService {
         Map<String, Integer> multiNodeMapToUse = projectDomainService.getMultiNodeMapToUse(isChoiceGpu, parallel);
         List<MultiCreateYamlRet> yamlList = new ArrayList<>();
         for (int i = runState + 1; i < parallel + runState + 1; i++) {
+            log.info("执行第:{}个任务,projectId:{}", i, projectId);
             MultiTaskMessageEntity messageEntity = multiTaskMessageEntityList.get(i);
             String taskId = messageEntity.getInfo().getTask_id();
             // 发送kafka消息
@@ -1232,6 +1233,7 @@ public class ProjectApplicationService {
     @SneakyThrows
     // TODO 此处加锁
     public void checkIfCanRunMulti(MultiProjectWaitQueueEntity projectWaitQueueEntity) {
+        log.info("开始尝试执行任务:{}", projectWaitQueueEntity.getProjectId());
 //        List<MultiTaskMessageEntity> multiTaskMessageEntityList = projectWaitQueueEntity.getMultiTaskMessageEntityList();
         //1 项目信息
         int parallelism = projectWaitQueueEntity.getWaitingParallelism();
@@ -1319,6 +1321,7 @@ public class ProjectApplicationService {
             int runSt = remainderParallelism + runState;
             waitMulti(MultiProjectWaitQueueEntity.builder().waitingParallelism(parallelism - remainderParallelism)
                 .runState(runSt)
+                .projectId(projectWaitQueueEntity.getProjectId())
                 .multiTaskMessageEntityList(projectWaitQueueEntity.getMultiTaskMessageEntityList())
                 .kafkaParamList(projectWaitQueueEntity.getKafkaParamList())
                 .build());
@@ -1335,6 +1338,7 @@ public class ProjectApplicationService {
             // 能执行完也需要删除之前redis key
             waitMulti(MultiProjectWaitQueueEntity.builder().waitingParallelism(0)
                     .runState(projectWaitQueueEntity.getMultiTaskMessageEntityList().size() -1)
+                    .projectId(projectWaitQueueEntity.getProjectId())
                     .kafkaParamList(projectWaitQueueEntity.getKafkaParamList())
                     .multiTaskMessageEntityList(projectWaitQueueEntity.getMultiTaskMessageEntityList()).build()
             );

+ 9 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/domain/service/ProjectDomainService.java

@@ -8,6 +8,7 @@ import api.common.pojo.vo.project.MultiSimulationSceneCarVO;
 import api.common.util.*;
 import com.alibaba.fastjson.JSONObject;
 import com.css.simulation.resource.scheduler.adapter.entity.ProjectStartMessageEntity;
+import com.css.simulation.resource.scheduler.app.entity.MultiProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.app.entity.ProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.infra.configuration.custom.CustomConfiguration;
 import com.css.simulation.resource.scheduler.infra.configuration.entity.NodeEntity;
@@ -1303,6 +1304,14 @@ public class ProjectDomainService {
             return null;
         }
     }
+    public List<MultiProjectWaitQueueEntity> getMultiWaitQueue() {
+        final String waitQueueJson = customRedisClient.get(DictConstants.MULTI_PROJECT_WAIT_QUEUE_KEY);
+        if (StringUtil.isNotEmpty(waitQueueJson)) {
+            return JsonUtil.jsonToList(waitQueueJson, MultiProjectWaitQueueEntity.class);
+        } else {
+            return null;
+        }
+    }
 
     public void setWaitQueue(List<ProjectWaitQueueEntity> projectWaitQueueEntities) {
         try {

+ 16 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/infra/scheduler/ProjectScheduler.java

@@ -3,6 +3,7 @@ package com.css.simulation.resource.scheduler.infra.scheduler;
 import api.common.pojo.constants.DictConstants;
 import api.common.util.CollectionUtil;
 import com.css.simulation.resource.scheduler.adapter.entity.ProjectStartMessageEntity;
+import com.css.simulation.resource.scheduler.app.entity.MultiProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.app.entity.ProjectWaitQueueEntity;
 import com.css.simulation.resource.scheduler.app.service.ProjectApplicationService;
 import com.css.simulation.resource.scheduler.domain.service.ProjectDomainService;
@@ -50,4 +51,19 @@ public class ProjectScheduler {
         }
     }
 
+    @Scheduled(fixedDelay = 2 * 60 * 1000)
+    public void dispatchMultiProject() {
+        List<MultiProjectWaitQueueEntity> projectWaitQueue = projectDomainService.getMultiWaitQueue();
+        if (CollectionUtil.isNotEmpty(projectWaitQueue)) {
+            MultiProjectWaitQueueEntity projectWaitQueueEntity = projectWaitQueue.get(0);
+            Integer waitingParallelism = projectWaitQueueEntity.getWaitingParallelism();
+            if (waitingParallelism > 0){
+                projectApplicationService.checkIfCanRunMulti(projectWaitQueueEntity);
+            }else {
+                log.info("无剩余等待任务,删除:{}", projectWaitQueueEntity.getProjectId());
+                projectApplicationService.waitMulti(projectWaitQueueEntity);
+            }
+        }
+    }
+
 }