root 2 년 전
부모
커밋
c1478e6975

+ 3 - 21
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/configuration/init/CustomApplicationRunner.java

@@ -1,40 +1,22 @@
 package com.css.simulation.resource.scheduler.common.configuration.init;
 
-import api.common.util.CollectionUtil;
-import com.css.simulation.resource.scheduler.common.configuration.kubernetes.KubernetesConfiguration;
-import com.css.simulation.resource.scheduler.common.util.KubernetesUtil;
-import com.css.simulation.resource.scheduler.service.domain.KubernetesNodeTO;
-import io.kubernetes.client.openapi.ApiClient;
+import com.css.simulation.resource.scheduler.common.util.ProjectUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
-import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
-import java.util.List;
 
 @Component
 @Slf4j
 public class CustomApplicationRunner implements ApplicationRunner {
     @Resource
-    private KubernetesConfiguration kubernetesConfiguration;
-    @Resource
-    private ApiClient apiClient;
-    @Resource
-    private StringRedisTemplate stringRedisTemplate;
+    private ProjectUtil projectUtil;
 
 
     @Override
     public void run(ApplicationArguments args) {
-        List<KubernetesNodeTO> initialNodeList = kubernetesConfiguration.getNodeList();
-        List<String> podNameList = KubernetesUtil.getPod(apiClient, kubernetesConfiguration.getNamespace());
-        if (CollectionUtil.isEmpty(podNameList)) {
-            for (KubernetesNodeTO kubernetesNodeTO : initialNodeList) {
-                stringRedisTemplate.opsForValue().set("node:" + kubernetesNodeTO.getName() + ":parallelism", kubernetesNodeTO.getMaxParallelism() + "");
-            }
-            log.info("重置节点并行度为:" + initialNodeList);
-        }
-
+        projectUtil.resetNodeParallelism();
     }
 }

+ 28 - 19
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/common/util/ProjectUtil.java

@@ -20,7 +20,6 @@ import lombok.Synchronized;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
-import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
@@ -55,8 +54,6 @@ public class ProjectUtil {
     @Resource
     private ClusterMapper clusterMapper;
     @Resource
-    private KafkaTemplate<String, String> kafkaTemplate;
-    @Resource
     private KubernetesConfiguration kubernetesConfiguration;
     @Resource
     private ApiClient apiClient;
@@ -109,21 +106,22 @@ public class ProjectUtil {
         try {
             // 先删除 redis key
             KubernetesUtil.deletePod(apiClient, kubernetesConfiguration.getNamespace(), podName);
-            log.info("deletePod() 等待 pod " + podName + " 的资源释放完成。");
+            log.info("等待 pod " + podName + " 的资源释放完成。");
             TimeUnit.SECONDS.sleep(7);
         } catch (ApiException apiException) {
-            log.info("deletePod() pod " + podName + " 已删除。");
+            log.info("pod " + podName + " 已删除。");
         } catch (Exception e) {
             e.printStackTrace();
-            log.error("deletePod() 删除 pod " + podName + " 报错。", e);
+            log.error("删除 pod " + podName + " 报错。", e);
         }
     }
 
 
-    public String getNodeNameOfPod(String podName) {
-        final String s = stringRedisTemplate.opsForValue().get("pod:" + podName + ":node");
+    public String getNodeNameOfPod(String projectId, String podName) {
+        String key = "project:" + projectId + ":pod:" + podName + ":node";
+        final String s = stringRedisTemplate.opsForValue().get(key);
         if (StringUtil.isEmpty(s)) {
-            throw new RuntimeException("getNodeNameOfPod() 无法获取 pod 运行所在节点:" + "pod:" + podName + ":node");
+            throw new RuntimeException("无法获取 pod 运行所在节点:" + key);
         }
         return s;
     }
@@ -139,9 +137,9 @@ public class ProjectUtil {
     @SneakyThrows
     public void createNextPod3(String projectId, String nodeName, String lastPodName) {
         log.info("createNextPod3() 删除上一个 pod:projectId={},nodeName={},lastPodName={}", projectId, nodeName, lastPodName);
-        String cpuOrderString = stringRedisTemplate.opsForValue().get("pod:" + lastPodName + ":cpu");
+        String cpuOrderString = stringRedisTemplate.opsForValue().get("project:" + projectId + ":pod:" + lastPodName + ":cpu");
         deletePod(lastPodName);
-        RedisUtil.deleteByKey(stringRedisTemplate, "pod:" + lastPodName + ":cpu");
+        RedisUtil.deleteByKey(stringRedisTemplate, "project:" + projectId + ":pod:" + lastPodName + ":cpu");
         //2 获取新的 yaml 信息
         final Set<String> yamlPathCacheKeySet = RedisUtil.getKeySetByPrefix(stringRedisTemplate, "project:" + projectId + ":node:" + nodeName + ":yaml");
         if (CollectionUtil.isEmpty(yamlPathCacheKeySet)) {
@@ -158,7 +156,7 @@ public class ProjectUtil {
             final String replace = read.replace("cpu-order", "\"" + cpuOrderString + "\"");
             FileUtil.writeStringToLocalFile(replace, absolutePath);
             // 创建 pod
-            createPod3(yamlPathCacheKey, cpuOrderString);
+            createPod3(projectId, yamlPathCacheKey, cpuOrderString);
             log.info("createNextPod3() 创建项目 " + projectId + " 在节点 " + nodeName + " 的下一个 pod,使用 cpu 编号为 " + cpuOrderString);
         }
     }
@@ -166,7 +164,7 @@ public class ProjectUtil {
     /**
      * @param redisKey yaml 地址的缓存 key
      */
-    public void createPodBegin(String redisKey) {
+    public void createPodBegin(String projectId,String redisKey) {
         final String podYamlPath = stringRedisTemplate.opsForValue().get(redisKey);
         if (podYamlPath == null) {
             throw new RuntimeException("createPod3() 根据缓存 key 获取 yaml 地址为 null:" + redisKey);
@@ -174,7 +172,7 @@ public class ProjectUtil {
         stringRedisTemplate.delete(redisKey);
         String nodeName = new File(podYamlPath).getName().split("#")[0];
         String podName = podYamlPath.split("#")[1].split("\\.")[0];
-        stringRedisTemplate.opsForValue().set("pod:" + podName + ":node", nodeName);    // 将 pod 运行在哪个 node 上记录到 redis
+        stringRedisTemplate.opsForValue().set("project:" + projectId + ":pod:" + podName + ":node", nodeName);    // 将 pod 运行在哪个 node 上记录到 redis
         new Thread(() -> KubernetesUtil.applyYaml(hostname, username, password, podYamlPath), "create-" + podName).start();
     }
 
@@ -182,7 +180,7 @@ public class ProjectUtil {
     /**
      * @param redisKey yaml 地址的缓存 key
      */
-    public void createPod3(String redisKey, String cpuOrderString) {
+    public void createPod3(String projectId, String redisKey, String cpuOrderString) {
         final String podYamlPath = stringRedisTemplate.opsForValue().get(redisKey);
         if (podYamlPath == null) {
             throw new RuntimeException("createPod3() 根据缓存 key 获取 yaml 地址为 null:" + redisKey);
@@ -190,8 +188,8 @@ public class ProjectUtil {
         stringRedisTemplate.delete(redisKey);
         String nodeName = new File(podYamlPath).getName().split("#")[0];
         String podName = podYamlPath.split("#")[1].split("\\.")[0];
-        stringRedisTemplate.opsForValue().set("pod:" + podName + ":node", nodeName);    // 将 pod 运行在哪个 node 上记录到 redis
-        stringRedisTemplate.opsForValue().set("pod:" + podName + ":cpu", cpuOrderString);    // 将 pod 运行在哪个 node 上记录到 redis
+        stringRedisTemplate.opsForValue().set("project:" + projectId + ":pod:" + podName + ":node", nodeName);    // 将 pod 运行在哪个 node 上记录到 redis
+        stringRedisTemplate.opsForValue().set("project:" + projectId + ":pod:" + podName + ":cpu", cpuOrderString);    // 将 pod 运行在哪个 node 上记录到 redis
         new Thread(() -> KubernetesUtil.applyYaml(hostname, username, password, podYamlPath), "create-" + podName).start();
     }
 
@@ -488,11 +486,11 @@ public class ProjectUtil {
         log.info("归还节点 " + nodeName + " 并行度:" + parallelismBefore + " --> " + parallelismAfter);
     }
 
-    public void parallelismAddOne(String nodeName){
+    public void parallelismAddOne(String nodeName) {
 
     }
 
-    public void parallelismReduceOne(String nodeName){
+    public void parallelismReduceOne(String nodeName) {
 
     }
 
@@ -505,4 +503,15 @@ public class ProjectUtil {
             return keys.stream().filter(key -> key.contains("waiting") && key.contains("message")).collect(Collectors.toList());
         }
     }
+
+    public void resetNodeParallelism() {
+        List<KubernetesNodeTO> initialNodeList = kubernetesConfiguration.getNodeList();
+        List<String> podNameList = KubernetesUtil.getPod(apiClient, kubernetesConfiguration.getNamespace());
+        if (CollectionUtil.isEmpty(podNameList)) {
+            for (KubernetesNodeTO kubernetesNodeTO : initialNodeList) {
+                stringRedisTemplate.opsForValue().set("node:" + kubernetesNodeTO.getName() + ":parallelism", kubernetesNodeTO.getMaxParallelism() + "");
+            }
+            log.info("重置节点并行度为:" + initialNodeList);
+        }
+    }
 }

+ 2 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/manager/ProjectManager.java

@@ -83,7 +83,7 @@ public class ProjectManager {
             String replace14;
             if (cpuOrder != null) {
                 replace14 = replace13.replace("cpu-order", "\"" + cpuOrder + "\"");     // 指定 cpu 编号
-                stringRedisTemplate.opsForValue().set("pod:" + podName + ":cpu", cpuOrder + "");    //  pod 运行使用的 cpu编号
+                stringRedisTemplate.opsForValue().set("project:" + projectId + ":pod:" + podName + ":cpu", cpuOrder + "");    //  pod 运行使用的 cpu编号
             } else {
                 replace14 = replace13;
             }
@@ -125,7 +125,7 @@ public class ProjectManager {
             String replace19;
             if (cpuOrder != null) {
                 replace19 = replace18.replace("cpu-order", "\"" + cpuOrder + "\"");     // 指定 cpu 编号
-                stringRedisTemplate.opsForValue().set("pod:" + podName + ":cpu", cpuOrder + "");    //  pod 运行使用的 cpu编号
+                stringRedisTemplate.opsForValue().set("project:" + projectId +":pod:" + podName + ":cpu", cpuOrder + "");    //  pod 运行使用的 cpu编号
             } else {
                 replace19 = replace18;
             }

+ 28 - 47
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/dao/manager/TaskManager.java

@@ -4,19 +4,16 @@ import api.common.pojo.common.ResponseBodyVO;
 import api.common.pojo.constants.DictConstants;
 import api.common.util.*;
 import com.css.simulation.resource.scheduler.common.configuration.kubernetes.KubernetesConfiguration;
-import com.css.simulation.resource.scheduler.common.util.KubernetesUtil;
-import com.css.simulation.resource.scheduler.dao.mapper.*;
-import com.css.simulation.resource.scheduler.service.feign.VideoService;
+import com.css.simulation.resource.scheduler.common.resource.TaskLock;
+import com.css.simulation.resource.scheduler.common.util.*;
 import com.css.simulation.resource.scheduler.dao.entity.IndexTemplatePO;
 import com.css.simulation.resource.scheduler.dao.entity.LeafIndexPO;
 import com.css.simulation.resource.scheduler.dao.entity.ProjectPO;
 import com.css.simulation.resource.scheduler.dao.entity.TaskPO;
+import com.css.simulation.resource.scheduler.dao.mapper.*;
 import com.css.simulation.resource.scheduler.service.domain.PrefixTO;
 import com.css.simulation.resource.scheduler.service.domain.ScoreTO;
-import com.css.simulation.resource.scheduler.common.resource.TaskLock;
-import com.css.simulation.resource.scheduler.common.util.MinioUtil;
-import com.css.simulation.resource.scheduler.common.util.ProjectUtil;
-import com.css.simulation.resource.scheduler.common.util.RedisUtil;
+import com.css.simulation.resource.scheduler.service.feign.VideoService;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.kubernetes.client.openapi.ApiClient;
@@ -28,6 +25,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.ibatis.session.ExecutorType;
 import org.apache.ibatis.session.SqlSession;
 import org.apache.ibatis.session.SqlSessionFactory;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
 import org.springframework.beans.factory.annotation.Value;
@@ -99,6 +97,8 @@ public class TaskManager {
     private KubernetesConfiguration kubernetesConfiguration;
     @Resource
     private ApiClient apiClient;
+    @Resource(name = "myKafkaAdmin")
+    private Admin admin;
 
     public void batchInsertTask(List<TaskPO> taskPOList) {
         try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
@@ -116,7 +116,7 @@ public class TaskManager {
     @SneakyThrows
     public boolean isProjectCompleted(PrefixTO redisPrefix, String projectId, String projectType, String maxSimulationTime, String taskId, String state, String podName) {
         boolean result = false;
-        String nodeName = projectUtil.getNodeNameOfPod(podName);
+        String nodeName = projectUtil.getNodeNameOfPod(projectId,podName);
         if ("Running".equals(state)) {  // 运行中的 pod 无需删除
             // 将运行中的任务的 pod 名称放入 redis
             stringRedisTemplate.opsForValue().set(redisPrefix.getTaskPodKey(), podName);
@@ -193,7 +193,7 @@ public class TaskManager {
      * @param userId 项目创建用户的 id
      */
     @SneakyThrows
-    public void score(PrefixTO redisPrefix, String userId, String projectId, String projectType, ClientSession session) {
+    public void score(String userId, String projectId, String projectType, ClientSession session) {
         // -------------------------------- 打分 --------------------------------
         ProjectPO projectPO = null;
         if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
@@ -205,7 +205,7 @@ public class TaskManager {
         TimeUnit.SECONDS.sleep(10); // 先等一下数据库更新
         List<TaskPO> taskList = taskMapper.selectTaskListByProjectId(projectId);  // 所有任务信息
         if (CollectionUtil.isEmpty(taskList)) {
-            log.error("TaskManager--score 项目 " + projectId + " 下没有查询到任务!");
+            log.error("项目 " + projectId + " 下没有查询到任务!");
             return;
         }
         indexMapper.deleteFirstByProjectId(projectId);
@@ -217,13 +217,7 @@ public class TaskManager {
         String leafIndexTemplateListJson = stringRedisTemplate.opsForValue().get(leafIndexKey);
         List<IndexTemplatePO> allIndexTemplateList = JsonUtil.jsonToList(allIndexTemplateListJson, IndexTemplatePO.class);
         List<IndexTemplatePO> leafIndexTemplateList = JsonUtil.jsonToList(leafIndexTemplateListJson, IndexTemplatePO.class);
-//        List<IndexTemplatePO> allIndexTemplateList = JsonUtil.jsonToList(
-//                FileUtil.read(linuxTempPath + "project/" + projectId + "/all-index-list.json"),
-//                IndexTemplatePO.class);
-//        List<IndexTemplatePO> leafIndexTemplateList = JsonUtil.jsonToList(
-//                FileUtil.read(linuxTempPath + "project/" + projectId + "/leaf-index-list.json"),
-//                IndexTemplatePO.class);
-        log.info("TaskManager--score 共有 " + leafIndexTemplateList.size() + "个叶子节点:" + leafIndexTemplateListJson);
+        log.info("共有 " + leafIndexTemplateList.size() + "个叶子节点:" + leafIndexTemplateListJson);
         int maxLevel = 1; // 用于计算指标得分
         List<LeafIndexPO> leafIndexList = new ArrayList<>();
         for (int i = 0; i < leafIndexTemplateList.size(); i++) {
@@ -239,7 +233,7 @@ public class TaskManager {
             if (packageLevel > maxLevel) {
                 maxLevel = packageLevel;
             }
-            log.info("TaskService--state 开始执行对第 " + (i + 1) + " 个叶子节点 " + indexId + " 进行打分!");
+            log.info("开始执行对第 " + (i + 1) + " 个叶子节点 " + indexId + " 进行打分!");
             // 根据叶子指标id查询评分规则创建用户id
             String createUserIdOfRule = scoringRulesMapper.selectCreateUserIdByIndexId(indexId);
             //1 判断有没有用户目录,没有则复制
@@ -248,20 +242,18 @@ public class TaskManager {
                 // 复制 main.py
                 FileUtil.createDirectory(scoreDirectoryOfUser);
                 FileUtil.cpR(pyPath, scoreDirectoryOfUser);
-//                LinuxUtil.execute("cp -r " + pyPath + "* " + scoreDirectoryOfUser);
             }
             //2 将打分规则保存到script目录
 
             String ruleFilePath = scoreDirectoryOfUser + "scripts/" + ruleName.split("_")[0] + "/" + ruleName + ".py";
             FileUtil.writeInputStreamToLocalFile(IoUtil.stringToInputStream(ruleDetails), ruleFilePath);
-            log.info("TaskService--state 将叶子节点 " + indexId + " 对应的打分规则保存到临时目录:" + ruleFilePath);
+            log.info("将叶子节点 " + indexId + " 对应的打分规则保存到临时目录:" + ruleFilePath);
             List<TaskPO> taskListOfLeafIndex = taskList.stream()
                     .filter(task -> indexId.equals(task.getLastTargetId()))
                     .collect(Collectors.toList());
-            log.info("TaskService--state 叶子节点 " + indexId + " 包括 " + taskListOfLeafIndex.size() + " 个任务:" + taskListOfLeafIndex);
+            log.info("叶子节点 " + indexId + " 包括 " + taskListOfLeafIndex.size() + " 个任务:" + taskListOfLeafIndex);
             // 计算叶子指标的得分
             // 使用 stream 流会出现无法进入循环的情况
-            // taskListOfLeafIndex.forEach(taskOfLeaf -> {});
             for (TaskPO taskOfLeaf : taskListOfLeafIndex) {
                 String task2Id = taskOfLeaf.getId();
 
@@ -277,10 +269,10 @@ public class TaskManager {
                     String scoreCommand = "python3 " + scoreDirectoryOfUser + "main.py " + result1OfLinux + " " + result2OfLinux + " " + taskOfLeaf.getSceneType() + " " + ruleName; // 指定打分脚本
                     String scoreResult;
                     ScoreTO score = null;
-                    log.info("TaskService--state 下载 minio 上的结果文件 " + result1OfMinio + " 和 " + result2OfMinio + " 到临时目录:" + linuxTempPath);
+                    log.info("下载 minio 上的结果文件 " + result1OfMinio + " 和 " + result2OfMinio + " 到临时目录:" + linuxTempPath);
                     MinioUtil.downloadToFile(minioClient, bucketName, result1OfMinio, result1OfLinux);  // 也可改成下载到指定ip的服务器上,需要保证和打分脚本在一台机器上。
                     MinioUtil.downloadToFile(minioClient, bucketName, result2OfMinio, result2OfLinux);  // 也可改成下载到指定ip的服务器上,需要保证和打分脚本在一台机器上。
-                    log.info("TaskService--state 开始执行打分命令:" + scoreCommand);
+                    log.info("开始执行打分命令:" + scoreCommand);
                     Runtime r = Runtime.getRuntime();
                     Process p = r.exec(scoreCommand, null, new File(scoreDirectoryOfUser));
                     BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
@@ -290,13 +282,12 @@ public class TaskManager {
                         sb.append(inline).append("\n");
                     }
                     scoreResult = sb.toString();
-//                    scoreResult = SshUtil.execute(session, scoreCommand);
-                    log.info("TaskService--state 项目" + projectId + " 的任务 " + task2Id + " 打分结束,结果为:" + scoreResult);
+                    log.info("项目" + projectId + " 的任务 " + task2Id + " 打分结束,结果为:" + scoreResult);
                     String replace = StringUtil.replace(scoreResult, "'", "\"");
                     try {
                         score = JsonUtil.jsonToBean(replace, ScoreTO.class);
                     } catch (Exception e) { // 打分失败
-                        log.info("TaskManager--state 项目" + projectId + " 的任务 " + task2Id + " 打分失败:", e);
+                        log.info("项目" + projectId + " 的任务 " + task2Id + " 打分失败:", e);
                     }
                     if (score != null) {
                         taskOfLeaf.setReturnSceneId(score.getUnit_scene_ID());
@@ -322,9 +313,6 @@ public class TaskManager {
                         taskOfLeaf.setScored(false);
                         taskMapper.updateFailStateWithStopTime(task2Id, DictConstants.TASK_ABORTED, TimeUtil.getNowForMysql(), DictConstants.TASK_ERROR_REASON_5);
                     }
-                    // 删除打分使用的文件
-//                    FileUtil.rm(result1OfLinux);
-//                    FileUtil.rm(result2OfLinux);
                 }
             }
 
@@ -355,7 +343,7 @@ public class TaskManager {
 
             // 计算任务的个数
             long taskNumberToScore = taskListOfLeafIndex.size();
-            log.info("TaskManager--score 项目 " + projectId + " 的叶子指标 " + indexId
+            log.info("项目 " + projectId + " 的叶子指标 " + indexId
                     + " 下参与计算的任务总数为 " + taskNumberToScore
                     + ":仿真异常场景个数 " + errorSceneNumber
                     + "、未达标场景个数 " + notStandardSceneNumber
@@ -394,14 +382,14 @@ public class TaskManager {
         // 保存叶子指标得分
         taskIndexManager.batchInsertLeafIndex(leafIndexList);
         // 保存一级指标分数
-        log.info("TaskManager--score 项目 " + projectId + " 的所有任务分数为:" + taskList);
+        log.info("项目 " + projectId + " 的所有任务分数为:" + taskList);
         computeFirst(leafIndexList, allIndexTemplateList, projectId, maxLevel);
-        log.info("TaskManager--score 项目 " + projectId + " 打分完成!");
+        log.info("项目 " + projectId + " 打分完成!");
     }
 
     public void computeFirst(List<LeafIndexPO> leafIndexList, List<IndexTemplatePO> allIndexTemplateList, String projectId, int maxLevel) {
 
-        log.info("------- /state computeFirst 计算父指标得分:" + leafIndexList);
+        log.info("计算父指标得分:" + leafIndexList);
         Iterator<LeafIndexPO> leafTaskIndexIterator = leafIndexList.iterator();
         // 把 1 级的指标得分直接保存
         while (leafTaskIndexIterator.hasNext()) {
@@ -460,18 +448,18 @@ public class TaskManager {
         String tokenUrl = tokenUri + "?grant_type=client_credentials"
                 + "&client_id=" + clientId
                 + "&client_secret=" + clientSecret;
-        log.info("TaskService--state 获取仿真云平台 token:" + tokenUrl);
+        log.info("获取仿真云平台 token:" + tokenUrl);
         String response = HttpUtil.get(closeableHttpClient, requestConfig, tokenUrl);
         ObjectMapper objectMapper = new ObjectMapper();
         JsonNode jsonNode = objectMapper.readTree(response);
         String accessToken = jsonNode.path("access_token").asText();
-        log.info("TaskService--state 仿真云平台 token 为:" + accessToken);
+        log.info("仿真云平台 token 为:" + accessToken);
         Map<String, String> headers = new HashMap<>();
         headers.put("Authorization", "Bearer " + accessToken);
         Map<String, String> params = new HashMap<>();
         params.put("id", projectId);
         String post = HttpUtil.post(closeableHttpClient, requestConfig, evaluationLevelUri, headers, params);
-        log.info("TaskService--state 访问仿真云平台评价等级接口:" + evaluationLevelUri + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + post);
+        log.info("访问仿真云平台评价等级接口:" + evaluationLevelUri + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + post);
     }
 
 
@@ -508,25 +496,18 @@ public class TaskManager {
         } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
             autoSubProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());
         }
-
-
         // 删除 kafka topic
-//        SshClient clientKafka = SshUtil.getClient();
-//        ClientSession sessionKafka = SshUtil.getSession(clientKafka, hostnameKafka, usernameKafka, passwordKafka);
-//        String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand, "topicName", projectId);
-//        SshUtil.execute(sessionKafka, topicDeleteCommand);
-//        SshUtil.stop(clientKafka, sessionKafka);
+        ApacheKafkaUtil.deleteTopic(admin, projectId);
 
         // 删除 redis 中的 项目运行信息 键值对
         RedisUtil.deleteByPrefix(stringRedisTemplate, redisPrefix.getProjectRunningKey());
         RedisUtil.deleteByPrefix(stringRedisTemplate, "project:" + projectId);
-        RedisUtil.deleteByPrefix(stringRedisTemplate, "pod:project-" + projectId);
         // 删除剩余 yaml
         projectUtil.deleteYamlByProjectId(projectId);
         if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-            log.info("done() 手动运行项目 " + projectId + " 执行完成!");
+            log.info("手动运行项目 " + projectId + " 执行完成!");
         } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-            log.info("done() 自动运行子项目 " + projectId + " 执行完成!");
+            log.info("自动运行子项目 " + projectId + " 执行完成!");
         }
 
 

+ 9 - 10
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -213,7 +213,7 @@ public class ProjectService {
         String algorithmDirectoryLinuxTempPath;
         String algorithmTarLinuxTempPath = null;
         if (algorithmPO != null) {
-            log.info("handleAlgorithm() 项目" + projectId + "需要使用仿真平台自己的算法 " + algorithmPO);
+            log.info("项目" + projectId + "需要使用仿真平台自己的算法 " + algorithmPO);
             String algorithmCode = algorithmPO.getAlgorithmCode();
             String dockerImport = algorithmPO.getDockerImport();
             dockerImage = dockerConfiguration.getRegistry() + "/algorithm_" + algorithmCode + ":latest";
@@ -224,14 +224,14 @@ public class ProjectService {
                 if (DictConstants.ALGORITHM_UPLOAD_MODE_FILE.equals(uploadMode)) {
                     algorithmTarLinuxTempPath = linuxTempPath + "algorithm-file/" + algorithmCode + "/" + algorithmCode + ".tar";
                     String minioPath = algorithmPO.getMinioPath();
-                    log.info("handleAlgorithm() 下载 minio 算法文件 " + minioPath + " 到本地 " + algorithmTarLinuxTempPath);
+                    log.info("下载 minio 算法文件 " + minioPath + " 到本地 " + algorithmTarLinuxTempPath);
                     MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);
                 } else if (DictConstants.ALGORITHM_UPLOAD_MODE_GIT.equals(uploadMode)) {
                     algorithmDirectoryLinuxTempPath = linuxTempPath + "algorithm-git/" + algorithmCode + "/";
                     String gitUrl = algorithmPO.getGitUrl().replace(gitConfiguration.getName(), gitConfiguration.getUrl());
                     String gitUserName = algorithmPO.getGitUserName();
                     String gitPassword = algorithmPO.getGitPassword();
-                    log.info("handleAlgorithm() 下载 git 算法文件 " + gitUrl + " 到本地 " + algorithmDirectoryLinuxTempPath);
+                    log.info("下载 git 算法文件 " + gitUrl + " 到本地 " + algorithmDirectoryLinuxTempPath);
                     GitUtil.clone(gitUrl, gitUserName, gitPassword, algorithmDirectoryLinuxTempPath, true);
                     for (String filename : Objects.requireNonNull(new File(algorithmDirectoryLinuxTempPath).list())) {
                         if (filename.endsWith(".tar")) {
@@ -283,7 +283,7 @@ public class ProjectService {
 //        LinuxUtil.execute("docker tag " + algorithmTarLinuxTempPath + " " + dockerImage);   // 标记镜像名称
 //        LinuxUtil.execute("docker login " + algorithmTarLinuxTempPath + " " + dockerImage); // 登录 harbor
 //        LinuxUtil.execute("docker push " + algorithmTarLinuxTempPath + " " + dockerImage);  // 推送镜像到仓库
-        log.info("handleAlgorithm 项目 " + projectId + " 使用的算法镜像为:" + dockerImage);
+        log.info("项目 " + projectId + " 使用的算法镜像为:" + dockerImage);
         return dockerImage;
     }
 
@@ -309,11 +309,10 @@ public class ProjectService {
 
         //4 根据 pod 前缀删除所有 pod
         if (isRunning) {
-            String podPrefix = "project-" + projectId;
-            Set<String> nodeOfPodKeySet = RedisUtil.getKeySetByPrefix(stringRedisTemplate, "pod:" + podPrefix);
-            for (String nodeOfPodKey : nodeOfPodKeySet) {
-                String podName = nodeOfPodKey.split(":")[1];
-                String nodeName = projectUtil.getNodeNameOfPod(podName);
+            Set<String> nodeOfPodKeySet = RedisUtil.getKeySetByPrefix(stringRedisTemplate, "project:" + projectId + ":pod:" + "project-" + projectId);
+            final Set<String> podNameSet = nodeOfPodKeySet.stream().map(key -> key.split(":")[3]).collect(Collectors.toSet());
+            for (String podName : podNameSet) {
+                String nodeName = projectUtil.getNodeNameOfPod(projectId, podName);
                 // 删除 pod
                 projectUtil.deletePod(podName);
                 // 节点并行度加一
@@ -335,7 +334,7 @@ public class ProjectService {
         //8 删除 minio 残留文件
         MinioUtil.rmR(minioClient, bucketName, projectResultPathOfMinio + projectId + "/");
 
-        log.info("stopProject() 项目 " + projectId + " 终止成功!");
+        log.info("项目 " + projectId + " 终止成功!");
         if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
             manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql());
         } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {

+ 2 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -57,10 +57,10 @@ public class TaskService {
         SshClient sshClient = SshUtil.getClient();
         ClientSession clientSession = SshUtil.getSession(sshClient, hostname, username, password);
         //2 准备打分
-        log.info("项目 " + projectId + "准备打分");
+        log.info("项目 " + projectId + "准备打分");
         taskManager.prepareScore(redisPrefix.getProjectRunningKey());
         //3 打分
-        taskManager.score(redisPrefix, userId, projectId, projectType, clientSession);
+        taskManager.score(userId, projectId, projectType, clientSession);
         //4 调用 server 的接口,计算评价等级
         taskManager.evaluationLevel(projectId);
         //5 结束

+ 13 - 13
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/consumer/ProjectConsumer.java

@@ -457,12 +457,12 @@ public class ProjectConsumer {
         int restParallelism = projectUtil.getRestParallelism();
         //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
         if (restParallelism > 0L) {
-            log.info("run() 集群 " + clusterId + " 执行项目 " + projectId);
+            log.info("集群 " + clusterId + " 执行项目 " + projectId);
             // 设置实际的并行度
             projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism));   // 设置实际的并行度
             parseProject(projectMessageDTO, projectRunningKey);
         } else {
-            log.info("run() 服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
+            log.info("服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
             wait(projectWaitingKey, projectMessageDTO);
         }
     }
@@ -477,7 +477,7 @@ public class ProjectConsumer {
         String modelType = projectMessageDTO.getModelType();
         String vehicleConfigId = projectMessageDTO.getVehicleConfigId();
         ProjectPO projectPO = projectUtil.getProjectByProjectId(projectId);
-        log.info("parseProject() 项目 " + projectId + " 信息为:" + projectPO);
+        log.info("项目 " + projectId + " 信息为:" + projectPO);
         String isChoiceGpu = projectPO.getIsChoiceGpu();
         // 项目类型
         int currentParallelism = projectMessageDTO.getCurrentParallelism();   // 当前并行度
@@ -509,7 +509,7 @@ public class ProjectConsumer {
         // 重新设置实际使用的并行度并保存到 redis
         int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
         projectMessageDTO.setCurrentParallelism(realCurrentParallelism);
-        log.info("parseProject() 项目 " + projectId + " 运行在:" + nodeMap);
+        log.info("项目 " + projectId + " 运行在:" + nodeMap);
         stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
         //* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
         String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
@@ -533,13 +533,13 @@ public class ProjectConsumer {
             String topic = recordMetadata.topic();  // 消息发送到的topic
             int partition = recordMetadata.partition(); // 消息发送到的分区
             long offset = recordMetadata.offset();  // 消息在分区内的offset
-            log.info("parseProject() 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
+            log.info("发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
             //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
             // 选一个 count 最少的 node
             String currentNodeName = "";
             NodeTO currentNodeTO = null;
             int currentCount = Integer.MAX_VALUE;
-            log.info("parseProject() 各节点已经预定的任务个数为:" + nodeListToCount);
+            log.info("各节点已经预定的任务个数为:" + nodeListToCount);
             for (NodeTO nodeTO : nodeListToCount) {
                 int tempCount = nodeTO.getCount();
                 String tempNodeName = nodeTO.getNodeName();
@@ -550,7 +550,7 @@ public class ProjectConsumer {
                 }
             }
             if (currentNodeTO == null) {
-                String errorMessage = "parseProject() 挑选节点失败。";
+                String errorMessage = "挑选节点失败。";
                 log.info(errorMessage);
                 throw new RuntimeException(errorMessage);
             }
@@ -562,22 +562,22 @@ public class ProjectConsumer {
                 nodeMap0.put(currentNodeName, cpuOrder);
             }
             // 只有准备启动(即 currentCount == 0)的时候才指定 cpu 编号
-            log.info("parseProject() 创建任务 " + taskId + " 的 yaml:是否使用 gpu (0是1否)" + isChoiceGpu + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前节点名称为:" + currentNodeName + ",当前 cpu 编号为:" + cpuOrder);
+            log.info("创建任务 " + taskId + " 的 yaml:是否使用 gpu (0是1否)" + isChoiceGpu + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前节点名称为:" + currentNodeName + ",当前 cpu 编号为:" + cpuOrder);
             String yamlRedisKey = projectManager.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder);
 
             if (currentCount == 0) {
                 String podName = yamlRedisKey.split(":")[yamlRedisKey.split(":").length - 1];
-                log.info("parseProject() 将 pod 加入到启动列表 " + podName);
+                log.info("将 pod 加入到启动列表 " + podName);
                 yamlToRunRedisKeyList.add(yamlRedisKey);
             }
             messageNumber++;
         }
         TimeUnit.SECONDS.sleep(6);
-        log.info("parseProject() 项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
+        log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
         for (String redisKey : yamlToRunRedisKeyList) {
-            projectUtil.createPodBegin(redisKey);
+            projectUtil.createPodBegin(projectId, redisKey);
         }
-        log.info("parseProject() 项目 " + projectId + " 已经启动 " + yamlToRunRedisKeyList);
+        log.info("项目 " + projectId + " 已经启动 " + yamlToRunRedisKeyList);
     }
 
 
@@ -595,7 +595,7 @@ public class ProjectConsumer {
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")
     @SneakyThrows
     public void stopProject(ConsumerRecord<String, String> stopRecord) {
-        log.info("stopProject() 接收到的项目终止消息为:" + stopRecord);
+        log.info("接收到的项目终止消息为:" + stopRecord);
         //1 读取 kafka 的项目停止信息
         /*
             {

+ 26 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/controller/NodeController.java

@@ -0,0 +1,26 @@
+package com.css.simulation.resource.scheduler.web.controller;
+
+import com.css.simulation.resource.scheduler.common.util.ProjectUtil;
+import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+
+@RefreshScope
+@RestController
+@RequestMapping("/node")
+public class NodeController {
+    @Resource
+    private ProjectUtil projectUtil;
+
+
+    /**
+     * 修改任务状态
+     */
+    @PostMapping("/resetParallelism")
+    public void re() {
+        projectUtil.resetNodeParallelism();
+    }
+}

+ 9 - 8
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/web/controller/TaskController.java

@@ -2,24 +2,21 @@ package com.css.simulation.resource.scheduler.web.controller;
 
 
 import com.css.simulation.resource.scheduler.service.TaskService;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.cloud.context.config.annotation.RefreshScope;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.annotation.Resource;
+
 @RefreshScope
 @RestController
 @RequestMapping("/task")
 public class TaskController {
 
-    TaskService taskService;
-
-    @Autowired
-    public TaskController(TaskService taskService) {
-        this.taskService = taskService;
-    }
+    @Resource
+    private TaskService taskService;
 
     // -------------------------------- Comment --------------------------------
 
@@ -27,7 +24,11 @@ public class TaskController {
      * 修改任务状态
      */
     @GetMapping("/state")
-    public void taskState(@RequestParam("taskId") String taskId, @RequestParam("state") String state, @RequestParam("podName") String podName) {
+    public void taskState(
+            @RequestParam("taskId") String taskId,
+            @RequestParam("state") String state,
+            @RequestParam("podName") String podName
+    ) {
         taskService.taskState(taskId, state, podName);
     }