martin 3 年之前
父节点
当前提交
e419363f68

+ 3 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -144,8 +144,9 @@ public class ManualProjectConsumer {
                 redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":waiting:" + projectId, projectJson);
             }
         } else {
-            log.info("ManualProjectConsumer--cacheManualProject 集群 " + clusterId + " 将项目 " + projectId + " 放入等待队列!");
-            redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":waiting:" + projectId, projectJson);
+            log.info("ManualProjectConsumer--cacheManualProject 集群 " + clusterId + " 将项目 " + projectId + " 执行!");
+            redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":running:" + projectId, projectJson);
+            parseManualProject(projectJson);
         }
     }
 

+ 3 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -49,7 +49,6 @@ public class TaskManager {
     ManualProjectMapper manualProjectMapper;
     @Autowired
     KafkaTemplate<String, String> kafkaTemplate;
-
     @Autowired
     TaskIndexManager taskIndexManager;
     @Autowired
@@ -170,11 +169,12 @@ public class TaskManager {
         return true;
     }
 
-    public void prepareScore(String userId, String projectId) {
+    public String prepareScore(String userId, String projectId) {
         log.info("TaskManager--prepareScore 项目 " + projectId + "准备打分!");
         ClusterPO clusterPO = clusterMapper.selectByUserId(userId);
         String clusterId = clusterPO.getId();
         stringRedisTemplate.delete(manualProjectTopic + ":cluster:" + clusterId + ":running" + projectId);
+        return clusterId;
     }
 
     @SneakyThrows
@@ -321,8 +321,7 @@ public class TaskManager {
         // 调用 server 的接口,计算评价等级
         evaluationLevel(projectId);
         log.info("TaskManager--score 项目 " + projectId + " 打分完成!");
-        manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());   // 修改该 project 的状态为已完成
-        log.info("TaskManager--score 项目 " + projectId + " 执行完成!");
+
 
     }
 

+ 13 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -65,8 +65,8 @@ public class ProjectScheduler {
     @Scheduled(fixedDelay = 60 * 1000)
     @SneakyThrows
     public void dispatchProject() {
-        //1 查询等待执行的项目
-        List<ProjectPO> projectList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_WAITING);
+        //1 查询已经排队的项目,即已经在页面上点击运行
+        List<ProjectPO> projectList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
         for (ProjectPO project : projectList) {
             String projectId = project.getId();
             String userId = project.getCreateUserId();
@@ -76,6 +76,9 @@ public class ProjectScheduler {
                 return;
             }
             String clusterId = clusterPO.getId();
+            if (StringUtil.isNotEmpty(redisTemplate.opsForValue().get(manualProjectTopic + ":cluster:" + clusterId + ":running:" + projectId))) {
+                continue; // 判断项目是否已经在执行,如果执行则 continue
+            }
             int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
             // 获取该用户正在运行的项目数量
             Set<String> runningProjectSet = redisTemplate.keys(manualProjectTopic + ":cluster:" + clusterId + ":running:" + "*");
@@ -105,7 +108,15 @@ public class ProjectScheduler {
                         }
                     }
                 }
+            } else {
+                String projectJson = redisTemplate.opsForValue().get(manualProjectTopic + ":cluster:" + clusterId + ":waiting:" + projectId);
+                redisTemplate.delete(manualProjectTopic + ":cluster:" + clusterId + ":waiting:" + projectId);
+                assert projectJson != null;
+                redisTemplate.opsForValue().set(manualProjectTopic + ":cluster:" + clusterId + ":running:" + projectId, projectJson);
+                log.info("ProjectScheduler--dispatchProject 项目 " + projectId + " 从等待队列进入执行状态!");
+                manualProjectConsumer.parseManualProject(projectJson);
             }
+
         }
 
     }

+ 9 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -1,8 +1,11 @@
 package com.css.simulation.resource.scheduler.service;
 
+import api.common.pojo.constants.DictConstants;
 import api.common.util.SshUtil;
+import api.common.util.TimeUtil;
 import com.css.simulation.resource.scheduler.manager.TaskManager;
 import com.css.simulation.resource.scheduler.mapper.IndexTemplateMapper;
+import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import io.minio.MinioClient;
@@ -42,6 +45,8 @@ public class TaskService {
     String bucketName;
     @Autowired
     TaskMapper taskMapper;
+    @Autowired
+    ManualProjectMapper manualProjectMapper;
 
 
     @SneakyThrows
@@ -65,14 +70,16 @@ public class TaskService {
         }
 
         //2 准备打分
-        taskManager.prepareScore(userId, projectId);
+        String clusterId = taskManager.prepareScore(userId, projectId);
 
         //3 打分
         taskManager.score(userId, projectId,session);
 
-
         // -------------------------------- 收尾 --------------------------------
+        manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());   // 修改该 project 的状态为已完成
+        log.info("TaskManager--score 项目 " + projectId + " 执行完成!");
 
+        stringRedisTemplate.delete(manualProjectTopic + ":cluster:" + clusterId + ":running:" + projectId);
         // 删除所有 key
 //        Set<String> keys = redisTemplate.keys("manualProject:" + projectId + "*");
 //        assert keys != null;