martin 3 years ago
parent
commit
6476b97d55

+ 6 - 6
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -7,7 +7,7 @@ import api.common.util.*;
 import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
-import com.css.simulation.resource.scheduler.service.ManualProjectService;
+import com.css.simulation.resource.scheduler.service.ProjectService;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -76,7 +76,7 @@ public class ProjectConsumer {
     @Autowired
     ClusterMapper clusterMapper;
     @Autowired
-    ManualProjectService manualProjectService;
+    ProjectService projectService;
     @Autowired
     ProjectUtil projectUtil;
 
@@ -190,10 +190,10 @@ public class ProjectConsumer {
         String userId = manualProjectMapper.selectCreateUserById(projectId);
         Long parallelism = projectMessageDTO.getParallelism();    // 并行度
         //2 执行前准备,删除改项目下所有任务,即重新执行改项目时需要新的测试包
-        manualProjectService.prepare(clusterPrefix, projectId, projectJson);
+        projectService.prepare(clusterPrefix, projectId, projectJson);
         // -------------------------------- 1 查询场景 --------------------------------
         //1-1 根据场景测试包 packageId,拿到场景集合(不包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
-        List<ScenePO> scenePOList = manualProjectService.handlePackage(projectRunningPrefix, projectId, packageId);
+        List<ScenePO> scenePOList = projectService.handlePackage(projectRunningPrefix, projectId, packageId);
         Set<ScenePO> scenePOSet = new HashSet<>(scenePOList); // 如果不去重的话会出现多个场景重复关联多个指标
         // -------------------------------- 2 查询模型 --------------------------------
         //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
@@ -201,9 +201,9 @@ public class ProjectConsumer {
         List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId);    // 摄像头
         List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
         // -------------------------------- 3 发送任务消息 --------------------------------
-        manualProjectService.sendTaskMessage(projectRunningPrefix, userId, projectId, maxSimulationTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
+        projectService.sendTaskMessage(projectRunningPrefix, userId, projectId, maxSimulationTime, scenePOSet, vehiclePO, cameraPOList, ogtPOList);
         // -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
-        String algorithmDockerImage = manualProjectService.handleAlgorithm(projectId, algorithmId);
+        String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
         int completions = scenePOList.size();     // 结束标
         log.info("ProjectConsumer--parseManualProject 项目 " + projectId + " 的完成度为:" + completions);

+ 39 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -6,7 +6,10 @@ import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
 import com.css.simulation.resource.scheduler.mapper.IndexMapper;
 import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-import com.css.simulation.resource.scheduler.pojo.po.*;
+import com.css.simulation.resource.scheduler.pojo.po.IndexTemplatePO;
+import com.css.simulation.resource.scheduler.pojo.po.LeafIndexPO;
+import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
+import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.pojo.to.ScoreTO;
 import com.css.simulation.resource.scheduler.util.MinioUtil;
@@ -18,6 +21,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -175,7 +179,7 @@ public class TaskManager {
     }
 
     @SneakyThrows
-    public void score(PrefixTO redisPrefix,String userId, String projectId, ClientSession session) {
+    public void score(PrefixTO redisPrefix, String userId, String projectId, ClientSession session) {
         // -------------------------------- 打分 --------------------------------
         ProjectPO projectPO = manualProjectMapper.selectById(projectId);
         String packageId = projectPO.getScenePackageId();  // 场景测试包 id,指标根 id
@@ -415,4 +419,37 @@ public class TaskManager {
     }
 
 
+    @SneakyThrows
+    public void done(PrefixTO redisPrefix, SshClient sshClient, ClientSession clientSession, String projectId) {
+
+        manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());   // 修改该 project 的状态为已完成
+
+        Set<String> keys = stringRedisTemplate.keys(redisPrefix.getProjectRunningKey() + "*");
+        if (CollectionUtil.isNotEmpty(keys)) {
+            keys.forEach(key -> stringRedisTemplate.delete(key));
+        } else {
+            log.error("TaskService--taskState 前缀为 " + redisPrefix.getProjectRunningKey() + " 的 key 为空!");
+        }
+
+        // 删除所有 key
+//        Set<String> keys = redisTemplate.keys("manualProject:" + projectId + "*");
+//        assert keys != null;
+//        redisTemplate.delete(keys);
+//        log.info("------- /state 任务 " + taskId + " 的父项目为:" + projectId);
+
+
+        // 删除 kafka topic
+//        SshClient clientKafka = SshUtil.getClient();
+//        ClientSession sessionKafka = SshUtil.getSession(clientKafka, hostnameKafka, usernameKafka, passwordKafka);
+//        String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand, "topicName", projectId);
+//        SshUtil.execute(sessionKafka, topicDeleteCommand);
+//        SshUtil.stop(clientKafka, sessionKafka);
+
+
+        // 删除 job
+        SshUtil.execute(clientSession, "kubectl delete job project-" + projectId);
+        clientSession.close();
+        sshClient.stop();
+    }
+
 }

+ 2 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -90,8 +90,10 @@ public class ProjectScheduler {
             int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
             // 获取该用户正在运行的项目数量
             Set<String> runningProjectSet = redisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
+
             if (CollectionUtil.isNotEmpty(runningProjectSet)) {
                 long parallelismSum = 0;
+                // cluster:${clusterId}:running:${projectId}
                 for (String runningProjectKey : runningProjectSet) {
                     parallelismSum += JsonUtil.jsonToBean(redisTemplate.opsForValue().get(runningProjectKey), ProjectMessageDTO.class).getParallelism();
                 }

+ 4 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ManualProjectService.java → simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
 
 @Service
 @Slf4j
-public class ManualProjectService {
+public class ProjectService {
 
     @Value("${scheduler.minio-path.project-result}")
     String projectResultPathOfMinio;
@@ -110,7 +110,7 @@ public class ManualProjectService {
         //2 查询场景包叶子指标
         List<IndexTemplatePO> leafIndexList = allIndexList.stream().filter(index -> StringUtil.isNotEmpty(index.getRuleId())).collect(Collectors.toList());
         stringRedisTemplate.opsForValue().set(leafIndexPrefix, JsonUtil.listToJson(leafIndexList));
-        log.info("ManualProjectService--handlePackage 项目 " + projectId + " 的叶子指标为:" + leafIndexList);
+        log.info("ProjectService--handlePackage 项目 " + projectId + " 的叶子指标为:" + leafIndexList);
         List<ScenePO> sceneList = new ArrayList<>();
         leafIndexList.forEach(leafIndex -> {
             String naturalIds = leafIndex.getSceneNaturalIds();
@@ -129,7 +129,7 @@ public class ManualProjectService {
                 sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
             }
         });
-        log.info("ManualProjectService--handlePackage 项目" + projectId + " 共有 " + sceneList.size() + " 个任务!");
+        log.info("ProjectService--handlePackage 项目" + projectId + " 共有 " + sceneList.size() + " 个任务!");
         return sceneList;
     }
 
@@ -313,7 +313,7 @@ public class ManualProjectService {
             }
         }
 
-        log.info("ManualProjectService--handleAlgorithm 项目 " + projectId + " 使用的算法镜像为:" + dockerImage);
+        log.info("ProjectService--handleAlgorithm 项目 " + projectId + " 使用的算法镜像为:" + dockerImage);
         return dockerImage;
     }
 }

+ 9 - 28
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -1,8 +1,6 @@
 package com.css.simulation.resource.scheduler.service;
 
-import api.common.pojo.constants.DictConstants;
 import api.common.util.SshUtil;
-import api.common.util.TimeUtil;
 import com.css.simulation.resource.scheduler.manager.TaskManager;
 import com.css.simulation.resource.scheduler.mapper.IndexTemplateMapper;
 import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
@@ -64,13 +62,13 @@ public class TaskService {
         String projectId = taskPO.getPId();
         String userId = taskPO.getCreateUserId();
         PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaksId(userId, projectId, taskId);
-        SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, hostname, username, password);
+        SshClient sshClient = SshUtil.getClient();
+        ClientSession clientSession = SshUtil.getSession(sshClient, hostname, username, password);
         //1 判断项目是否已完成
-        boolean projectCompleted = taskManager.isProjectCompleted(redisPrefix, projectId, taskId, state, podName, session);
+        boolean projectCompleted = taskManager.isProjectCompleted(redisPrefix, projectId, taskId, state, podName, clientSession);
         if (!projectCompleted) {
-            session.close();
-            client.stop();
+            clientSession.close();
+            sshClient.stop();
             return;
         }
 
@@ -79,35 +77,18 @@ public class TaskService {
         taskManager.prepareScore(redisPrefix.getProjectRunningKey());
 
         //3 打分
-        taskManager.score(redisPrefix, userId, projectId, session);
+        taskManager.score(redisPrefix, userId, projectId, clientSession);
 
         // -------------------------------- 收尾 --------------------------------
-        manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());   // 修改该 project 的状态为已完成
-        log.info("TaskManager--score 项目 " + projectId + " 执行完成!");
 
-        stringRedisTemplate.delete(redisPrefix.getProjectRunningKey());
-        // 删除所有 key
-//        Set<String> keys = redisTemplate.keys("manualProject:" + projectId + "*");
-//        assert keys != null;
-//        redisTemplate.delete(keys);
-//        log.info("------- /state 任务 " + taskId + " 的父项目为:" + projectId);
+        taskManager.done(redisPrefix,sshClient, clientSession, projectId);
+        log.info("TaskService--taskState 项目 " + projectId + " 执行完成!");
 
 
-        // 删除 kafka topic
-//        SshClient clientKafka = SshUtil.getClient();
-//        ClientSession sessionKafka = SshUtil.getSession(clientKafka, hostnameKafka, usernameKafka, passwordKafka);
-//        String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand, "topicName", projectId);
-//        SshUtil.execute(sessionKafka, topicDeleteCommand);
-//        SshUtil.stop(clientKafka, sessionKafka);
-
-
-        // 删除 job
-        SshUtil.execute(session, "kubectl delete job project-" + projectId);
-        session.close();
-        client.stop();
 
     }
 
+
     public Boolean taskConfirm(String taskId) {
         return taskManager.taskConfirm(taskId);
     }