root 2 years ago
parent
commit
fc0ee5085c

+ 1 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/resource/TaskLock.java

@@ -37,13 +37,12 @@ public class TaskLock {
             ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey()), ProjectMessageDTO.class);
             int taskTotal = projectMessageDTO.getTaskTotal();
             int taskCompleted = projectMessageDTO.getTaskCompleted();
-            log.info("TaskManager--isProjectCompleted 项目 " + projectId + " 完成进度为:" + (taskCompleted + 1) + "/" + taskTotal);
+            log.info("isProjectCompleted() 项目 " + projectId + " 完成进度为:" + (taskCompleted + 1) + "/" + taskTotal);
             if (taskCompleted + 1 == taskTotal) {
                 result = true;
             } else {    // 项目没有完成
                 projectMessageDTO.setTaskCompleted(taskCompleted + 1);  // 增加已完成任务数
                 stringRedisTemplate.opsForValue().set(redisPrefix.getProjectRunningKey(), JsonUtil.beanToJson(projectMessageDTO));
-
             }
         } catch (Exception e) {
             throw new RuntimeException(e);

+ 7 - 6
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/util/ProjectUtil.java

@@ -109,12 +109,13 @@ public class ProjectUtil {
             String key = "pod:" + podName + ":node";
             // 先删除 redis key
             RedisUtil.deleteByKey(stringRedisTemplate, "pod:" + podName + ":node");
+            RedisUtil.deleteByKey(stringRedisTemplate, "pod:" + podName + ":cpu");
             KubernetesUtil.deletePod(apiClient, kubernetesConfiguration.getNamespace(), podName);
             TimeUnit.SECONDS.sleep(10); // 暂停 10 秒等待资源完全释放。
-            log.info("ProjectUtil--deletePod 删除 pod 并删除 redis 键值对:" + key);
+            log.info("deletePod() 删除 pod 并删除 redis 键值对:" + key);
         } catch (Exception e) {
             e.printStackTrace();
-            log.error("ProjectUtil--deletePod pod " + podName + " 不存在。", e);
+            log.error("deletePod() pod " + podName + " 不存在。", e);
         }
     }
 
@@ -135,6 +136,7 @@ public class ProjectUtil {
         //1 删除上一个 pod 和 redis 键值对 和 旧的 yaml 文件
 //        new Thread(() -> deletePod(lastPodName), "delete-" + lastPodName).start();
         //
+        String cpuOrderString = stringRedisTemplate.opsForValue().get("pod:" + lastPodName + ":cpu");
         deletePod(lastPodName);
         List<String> list = FileUtil.listAbsolutePath(podYamlDirectory);
         Iterator<String> iterator1 = list.iterator();
@@ -148,7 +150,6 @@ public class ProjectUtil {
         }
         for (String absolutePath : list) {
             if (absolutePath.contains(nodeName) && absolutePath.contains(projectId)) {
-                String cpuOrderString = stringRedisTemplate.opsForValue().get("pod:" + lastPodName + ":cpu");
                 Optional.ofNullable(cpuOrderString).orElseThrow(() -> new RuntimeException("createNextPod2() pod " + lastPodName + " 缓存的 cpu 编号为空。"));
                 final String read = FileUtil.read(absolutePath);
                 final String replace = read.replace("cpu-order", "\"" + cpuOrderString + "\"");
@@ -213,7 +214,7 @@ public class ProjectUtil {
         } else if (autoSubProjectPO != null) {
             return autoSubProjectPO;
         }
-        throw new RuntimeException("ProjectUtil--getProjectByProjectId 不存在项目:" + projectId);
+        throw new RuntimeException("getProjectByProjectId() 不存在项目:" + projectId);
     }
 
 
@@ -251,7 +252,7 @@ public class ProjectUtil {
      */
     public Map<String, Integer> getNodeMap() {
         List<KubernetesNodeTO> initialNodeList = kubernetesConfiguration.getNodeList(); // 预设并行度的节点列表
-        log.info("ProjectUtil--getNodeMap 预设并行度的节点列表为:" + initialNodeList);
+        log.info("getNodeMap() 预设并行度的节点列表为:" + initialNodeList);
         Map<String, Integer> resultNodeMap = new HashMap<>();    // 用于执行的节点映射(节点名,并行度)
         for (KubernetesNodeTO kubernetesNodeSource : initialNodeList) {
             KubernetesNodeTO kubernetesNodeCopy = kubernetesNodeSource.clone();
@@ -482,7 +483,7 @@ public class ProjectUtil {
         } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
             userId = autoSubProjectMapper.selectCreateUserById(projectId);
         } else {
-            throw new RuntimeException("PrjectUtil--getRedisPrefixByProjectIdAndProjectType 未知的项目类型!");
+            throw new RuntimeException("ProjectUtil--getRedisPrefixByProjectIdAndProjectType 未知的项目类型!");
         }
 
         //3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)

+ 3 - 6
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/manager/TaskManager.java

@@ -5,7 +5,6 @@ import api.common.pojo.constants.DictConstants;
 import api.common.util.*;
 import com.css.simulation.resource.scheduler.dao.mapper.*;
 import com.css.simulation.resource.scheduler.service.feign.VideoService;
-import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.dao.entity.IndexTemplatePO;
 import com.css.simulation.resource.scheduler.dao.entity.LeafIndexPO;
 import com.css.simulation.resource.scheduler.dao.entity.ProjectPO;
@@ -117,14 +116,12 @@ public class TaskManager {
             taskTick(taskId); // 刷新一下心跳
             log.info("TaskManager--state 修改任务 " + taskId + " 的状态为 " + state + ",pod 名称为:" + podName);
             taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
-            // 插入证书使用数据
-
             return false;
         } else { // 结束的 pod 都直接删除,并判断项目是否完成
             // -------------------------------- 处理状态 --------------------------------
             // TODO 暂时不用重试操作
             try {
-                log.info("TaskManager--state 修改任务 " + taskId + "的状态为 " + state + ",pod 名称为:" + podName + ",并删除 pod。");
+                log.info("state() 修改任务 " + taskId + "的状态为 " + state + ",pod 名称为:" + podName + ",并删除 pod。");
                 if ("Aborted".equals(state)) {
                     String minioPathOfErrorLog = resultPathMinio + projectId + "/" + taskId + "error.log";
                     boolean objectExist = MinioUtil.isObjectExist(minioClient, bucketName, minioPathOfErrorLog);
@@ -153,7 +150,7 @@ public class TaskManager {
                     taskMapper.updateSuccessStateWithStopTime(taskId, state, TimeUtil.getNowForMysql());
                     // 查询项目是否使用 gpu 生成视频(0是1否)
                     String isChoiceGpu = projectUtil.getProjectByProjectId(projectId).getIsChoiceGpu();
-                    log.info("项目 " + projectId + " 是否需要生成 gpu 视频(0是1否):" + isChoiceGpu);
+                    log.info("项目 " + projectId + " 是否需要生成 gpu 视频( 0是 1否 ):" + isChoiceGpu);
                     if ("1".equals(isChoiceGpu)) {
                         FutureTask<ResponseBodyVO<String>> videoTask = new FutureTask<>(() -> videoService.generateVideo(projectId, projectType, maxSimulationTime, taskId));
                         new Thread(videoTask, "video-" + StringUtil.getRandomEightBitUUID()).start();
@@ -469,7 +466,7 @@ public class TaskManager {
     }
 
     public void taskTick(String taskId) {
-        log.info("TaskService--taskTick 任务 " + taskId + "心跳!");
+        log.info("taskTick() 任务 " + taskId + "心跳!");
         TaskPO taskPO = taskMapper.selectById(taskId);
         String projectId = taskPO.getPId();
         String userId = taskPO.getCreateUserId();

+ 22 - 16
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -295,28 +295,34 @@ public class ProjectService {
     public void stopProject(String projectId, String projectType) {
 
         //1 判断项目是否已经运行
+        PrefixTO redisPrefix = projectUtil.getRedisPrefixByProjectIdAndProjectType(projectId, projectType);
+        final String projectRunningKeyPrefix = redisPrefix.getProjectRunningKey();
+        final Set<String> projectRunningKeySet = RedisUtil.getKeySetByPrefix(stringRedisTemplate, projectRunningKeyPrefix);
+        boolean isRunning = true;
+        if (CollectionUtil.isEmpty(projectRunningKeySet)) {
+            isRunning = false;
+        }
 
 
-        //7 删除 kafka 消息
+        //2 删除 kafka 消息
         ApacheKafkaUtil.deleteTopic(kafkaAdminClient, projectId);
-
-        //1 删除项目所有任务
+        //3 删除项目所有任务
         taskMapper.deleteByProject(projectId);
-
-        //2 根据 pod 前缀删除所有 pod
-        String podPrefix = "project-" + projectId;
-        Set<String> nodeOfPodKeySet = RedisUtil.getKeySetByPrefix(stringRedisTemplate, "pod:" + podPrefix);
-        for (String nodeOfPodKey : nodeOfPodKeySet) {
-            String podName = nodeOfPodKey.split(":")[1];
-            String nodeName = projectUtil.getNodeNameOfPod(podName);
-            // 删除 pod
-            projectUtil.deletePod(podName);
-            // 节点并行度加一
-            projectUtil.addOneParallelismToNode(nodeName);
+        if (isRunning){
+            //2 根据 pod 前缀删除所有 pod
+            String podPrefix = "project-" + projectId;
+            Set<String> nodeOfPodKeySet = RedisUtil.getKeySetByPrefix(stringRedisTemplate, "pod:" + podPrefix);
+            for (String nodeOfPodKey : nodeOfPodKeySet) {
+                String podName = nodeOfPodKey.split(":")[1];
+                String nodeName = projectUtil.getNodeNameOfPod(podName);
+                // 删除 pod
+                projectUtil.deletePod(podName);
+                // 节点并行度加一
+                projectUtil.addOneParallelismToNode(nodeName);
+            }
         }
 
         //3 其他 redis key
-        PrefixTO redisPrefix = projectUtil.getRedisPrefixByProjectIdAndProjectType(projectId, projectType);
         RedisUtil.deleteByPrefix(stringRedisTemplate, "project:" + projectId + ":package:");
         RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getProjectRunningKey());
         RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getProjectWaitingKey());
@@ -331,7 +337,7 @@ public class ProjectService {
         MinioUtil.rmR(minioClient, bucketName, projectResultPathOfMinio + projectId + "/");
 
 
-        log.info("ProjectService.stopProject() 项目 " + projectId + " 终止成功!");
+        log.info("stopProject() 项目 " + projectId + " 终止成功!");
     }
 
 

+ 1 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/consumer/ProjectConsumer.java

@@ -518,7 +518,6 @@ public class ProjectConsumer {
         String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
         // -------------------------------- 4 发送任务消息 --------------------------------
         List<NodeTO> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
-        log.info("parseProject() 项目 " + projectId + " 运行在:" + nodeListToCount);
         int messageNumber = 0;
         ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1);   // 创建主题
         TimeUnit.SECONDS.sleep(7);
@@ -557,7 +556,7 @@ public class ProjectConsumer {
                 }
             }
             if (currentNodeTO == null) {
-                String errorMessage = "parseProject() 挑选节点失败";
+                String errorMessage = "parseProject() 挑选节点失败";
                 log.info(errorMessage);
                 throw new RuntimeException(errorMessage);
             }