martin 2 lat temu
rodzic
commit
3e716d3dea

+ 5 - 1
api-common/src/main/java/api/common/util/FileUtil.java

@@ -1,5 +1,6 @@
 package api.common.util;
 
+import lombok.SneakyThrows;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
@@ -36,7 +37,8 @@ public class FileUtil {
     }
 
 
-    public static String read(String path) throws Exception {
+    @SneakyThrows
+    public static String read(String path){
         return read(getFile(path));
     }
 
@@ -811,6 +813,7 @@ public class FileUtil {
     /**
      * 将字符串保存为本地文件
      */
+    @SneakyThrows
     public static void writeStringToLocalFile(String string, String filePath) throws IOException {
         writeInputStreamToLocalFile(new ByteArrayInputStream(string.getBytes(StandardCharsets.UTF_8)), filePath);
     }
@@ -818,6 +821,7 @@ public class FileUtil {
     /**
      * 将字符串保存为本地文件
      */
+    @SneakyThrows
     public static void writeStringToLocalFile(String string, String filePath, int bufferLength) throws IOException {
         writeInputStreamToLocalFile(new ByteArrayInputStream(string.getBytes(StandardCharsets.UTF_8)), filePath, bufferLength);
     }

+ 21 - 0
api-common/src/main/java/api/common/util/StringUtil.java

@@ -5,6 +5,27 @@ import java.util.UUID;
 
 public class StringUtil {
 
+    /**
+     * 复制一个字符串
+     * @param charArray 字符数组
+     * @return 字符串
+     */
+    public static String charArrayToString(char[] charArray) {
+        // 字符串存放在 java 字符串常量池,具有不变性
+        return String.valueOf(charArray);
+    }
+
+    /**
+     * 复制一个字符串
+     * @param string 字符串
+     * @return 字符串
+     */
+    public static String copy(String string) {
+        // 字符串存放在 java 字符串常量池,具有不变性
+        return string;
+    }
+
+
     public static String[] splitByBlank(String string) {
         return string.split("\\s+");
     }

+ 4 - 22
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -3,7 +3,6 @@ package com.css.simulation.resource.scheduler.consumer;
 
 import api.common.pojo.constants.DictConstants;
 import api.common.pojo.dto.ProjectMessageDTO;
-import api.common.util.CollectionUtil;
 import api.common.util.JsonUtil;
 import api.common.util.StringUtil;
 import com.css.simulation.resource.scheduler.mapper.*;
@@ -127,26 +126,10 @@ public class ProjectConsumer {
         int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
         // 获取该集群中正在运行的项目,如果没有则立即执行
         PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
-        Set<String> clusterRunningKeySet = stringRedisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
-        List<String> runningProjectSet;
-        if (CollectionUtil.isEmpty(clusterRunningKeySet)) {
-            run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
-            return;
-        }
-        runningProjectSet = projectUtil.getRunningProjectList(clusterRunningKeySet);
-        if (CollectionUtil.isEmpty(runningProjectSet)) {
-            run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
-            return;
-        }
-        // 计算正在运行的项目的并行度总和
-        long parallelismSum = 0;
-        for (String projectKey : runningProjectSet) {
-            String projectJsonTemp = stringRedisTemplate.opsForValue().get(projectKey);
-            ProjectMessageDTO projectMessageTemp = JsonUtil.jsonToBean(projectJsonTemp, ProjectMessageDTO.class);
-            parallelismSum += projectMessageTemp.getParallelism();
-        }
+        // 获取正在运行的项目的并行度总和
+        int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
         // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
-        if (parallelismSum + parallelism <= simulationLicenseNumber) {
+        if (currentParallelismSum + parallelism <= simulationLicenseNumber) {
             run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
         } else {
             wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
@@ -234,8 +217,7 @@ public class ProjectConsumer {
         // -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
         String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
-        //TODO
-        projectService.transferAndRunYaml(projectId, nodeMap, algorithmDockerImage);
+        projectService.createPod(projectId, nodeMap, algorithmDockerImage);
 
     }
 

+ 83 - 39
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -12,6 +12,12 @@ import com.css.simulation.resource.scheduler.util.MinioUtil;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1Pod;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.util.Yaml;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -24,6 +30,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Resource;
+import java.io.File;
 import java.io.InputStream;
 import java.util.*;
 import java.util.stream.Collectors;
@@ -48,6 +55,8 @@ public class ProjectService {
     String username;
     @Value("${scheduler.host.password}")
     String password;
+    @Value("${scheduler.kubernetes.namespace}")
+    String kubernetesNamespace;
     @Resource
     StringRedisTemplate stringRedisTemplate;
     @Resource
@@ -59,10 +68,6 @@ public class ProjectService {
     @Resource
     MinioClient minioClient;
     @Resource
-    ManualProjectMapper manualProjectMapper;
-    @Resource
-    AutoSubProjectMapper autoSubProjectMapper;
-    @Resource
     TaskMapper taskMapper;
     @Resource
     IndexTemplateMapper indexTemplateMapper;
@@ -72,6 +77,8 @@ public class ProjectService {
     SceneMapper sceneMapper;
     @Resource
     AlgorithmMapper algorithmMapper;
+    @Resource
+    ApiClient apiClient;
 
 
     // -------------------------------- Comment --------------------------------
@@ -85,11 +92,12 @@ public class ProjectService {
     @Transactional
     public void prepare(Map<String, Integer> nodeMap, ProjectMessageDTO projectMessageDTO, String clusterPrefix, String projectRunningKey) {
         String projectId = projectMessageDTO.getProjectId();
+        String projectType = projectMessageDTO.getType();
         //1 将指定 node 的并行度减少
         nodeMap.keySet().forEach(nodeName -> {
-            long parallelismToUse = nodeMap.get(nodeName);
+            int parallelismToUse = nodeMap.get(nodeName);
             String restParallelismKey = "node:" + nodeName + ":parallelism";
-            long restParallelism = Long.parseLong(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(restParallelismKey)));// 剩余可用并行度
+            int restParallelism = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(restParallelismKey)));// 剩余可用并行度
             stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism - parallelismToUse) + "");
         });
         //2 将 redis 中该项目旧的信息则直接删除(包括 waitingKey)
@@ -102,12 +110,12 @@ public class ProjectService {
             }
         }
 
-        //4 将项目状态修改为执行中
-        if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
-            manualProjectMapper.updateNowRunStateById(DictConstants.PROJECT_RUNNING, projectId);   // 修改该 project 的状态为执行中,同时将已完成任务重置为 0 。
-        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
-            autoSubProjectMapper.updateNowRunStateById(DictConstants.PROJECT_RUNNING, projectId);
-        }
+//        //4 将项目状态修改为执行中(页面上已经改成了执行中,这里应该不需要)
+//        if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
+//            manualProjectMapper.updateNowRunStateById(DictConstants.PROJECT_RUNNING, projectId);
+//        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
+//            autoSubProjectMapper.updateNowRunStateById(DictConstants.PROJECT_RUNNING, projectId);
+//        }
 
         //5 将该 project 下所有旧的任务和指标得分删除。
         taskMapper.deleteByProject(projectId);
@@ -161,7 +169,7 @@ public class ProjectService {
      * @param projectRunningPrefix
      * @param userId
      * @param projectId
-     * @param maxSimulationTime
+     * @param videoTime            视频长度
      * @param scenePOSet
      * @param vehiclePO
      * @param cameraPOList
@@ -374,43 +382,79 @@ public class ProjectService {
         return dockerImage;
     }
 
+//    /**
+//     * 运行
+//     *
+//     * @param projectId            项目id
+//     * @param nodeMap              并行度
+//     * @param algorithmDockerImage 算法镜像
+//     */
+//    @SneakyThrows
+//    public void transferAndRunYaml(String projectId, Map<String, Integer> nodeMap, String algorithmDockerImage) {
+//        String podTemplate = FileUtil.read(podTemplateYaml);
+//        String replace0 = podTemplate.replace("vtd-container", "vtd-" + projectId);
+//        String replace1 = replace0.replace("algorithm-container", "algorithm-" + projectId);
+//        String replace2 = replace1.replace("algorithm-image", algorithmDockerImage);
+//        String replace3 = replace2.replace("kafkaTopic", projectId);     // 消息主题名称为 projectId
+//        nodeMap.forEach((nodeName, parallelism) -> {
+//            String tempPodNameSuffix = projectId + StringUtil.getRandomUUID();
+//            String tempReplace4 = replace3.replace("pod-name", "project-" + tempPodNameSuffix); // pod 名称包括 projectId 和 随机字符串
+//            String tempFinalYaml = tempReplace4.replace("node-name", nodeName);     // 指定 pod 运行节点
+//            log.info("ProjectConsumer--parseManualProject 在节点 " + nodeName + " 开始执行 pod:" + finalYaml);
+//            String tempFinalYamlTargetPath = podYamlDirectory + tempPodNameSuffix;
+//            FileUtil.writeStringToLocalFile(tempFinalYaml, tempFinalYamlTargetPath);
+//            //  启动
+//            KubernetesUtil.applyYaml(hostname, username, password, jobTemplateYamlPathTarget);
+//        });
+//    }
+
     /**
-     * 运行
+     * 运行 pod
      *
      * @param projectId            项目id
      * @param nodeMap              并行度
      * @param algorithmDockerImage 算法镜像
      */
     @SneakyThrows
-    public void transferAndRunYaml(String projectId, Map<String, Integer> nodeMap, String algorithmDockerImage) {
-        String yamlSource = FileUtil.read(podTemplateYaml);
-//        log.info("ProjectConsumer--transferYaml 模板文件为:" + yamlSource);
-        String replace0 = yamlSource.replace("job-cloud-simulation", "project-" + projectId);
-        String replace1 = replace0.replace("vtd-container", "vtd-" + projectId);
-        String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
-        String replace3 = replace2.replace("algorithm-image", algorithmDockerImage);
-        String replace4 = replace3.replace("projectId", projectId);
-        String replace5 = replace4.replace("completions-number", completions + "");
-        String replace6 = replace5.replace("parallelism-number", parallelism + "");
-        String replace7 = replace6.replace("apiVers1on", "apiVersion");
-        String replace8 = replace7.replace("1atch/v1", "batch/v1");
-        String finalYaml = replace8.replace("node-name", nodeName);
-        log.info("ProjectConsumer--parseManualProject 在节点 " + nodeName + " 开始执行 pod:" + finalYaml);
-        FileUtil.writeStringToLocalFile(finalYaml, jobTemplateYamlPathTarget);
-        //  启动
-        KubernetesUtil.applyYaml(hostname, username, password, jobTemplateYamlPathTarget);
+    public void createPod(String projectId, Map<String, Integer> nodeMap, String algorithmDockerImage) {
+        V1Pod v1Pod = (V1Pod) Yaml.load(new File(podTemplateYaml));
+        V1ObjectMeta metadata = v1Pod.getMetadata();
+        V1PodSpec spec = v1Pod.getSpec();
+        List<V1Container> containers = spec.getContainers();
+        containers.forEach(container -> {
+            if ("vtd-container".equals(container.getName())) {
+                container.setName("vtd-" + projectId);
+                List<String> commandList = container.getCommand();
+                for (int i = 0; i < commandList.size(); i++) {
+                    if ("kafkaTopic".equals(commandList.get(i))) {
+                        commandList.set(i, projectId);
+                    }
+                }
+            }
+            if ("algorithm-container".equals(container.getName())) {
+                container.setName("vtd-" + projectId);
+                container.setImage(algorithmDockerImage);
+            }
+
+        });
+
+        nodeMap.forEach((nodeName, parallelism) -> {
+            String tempPodName = "project-" + projectId + StringUtil.getRandomUUID();
+            metadata.setName(tempPodName);   // pod 名称包括 projectId 和 随机字符串
+            spec.setNodeName(nodeName);          // 指定 pod 运行节点
+            //  启动
+            KubernetesUtil.createPod(apiClient, kubernetesNamespace, v1Pod);
+        });
     }
 
+
     @SneakyThrows
-    public void stopProject(String projectId, String type) {
+    public void stopProject(String projectId, String projectType) {
+        //2 根据 pod 前缀删除所有 pod
+        String prefix = "project-" + projectId;
+        KubernetesUtil.deleteJob(apiClient, kubernetesNamespace, prefix);
 
-        // 需要判断项目在执行还是在等待
-//        if (DictConstants.PROJECT_TYPE_MANUAL.equals(type)) {
-//            manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql());
-//        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(type)) {
-//            autoSubProjectMapper.updateNowRunStateAndFinishTimeById(DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql(), projectId);
-//        }
-//        KubernetesUtil.deleteJob(apiClient, "default", "project-" + projectId);
+        //3 删除所有 redis key
 //        PrefixTO redisPrefix = projectUtil.getRedisPrefixByProjectIdAndProjectType(projectId, type);
 //        Set<String> keys = stringRedisTemplate.keys(redisPrefix.getProjectRunningKey() + "*");
 //        if (CollectionUtil.isNotEmpty(keys)) {

+ 22 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/KubernetesUtil.java

@@ -5,10 +5,8 @@ import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.openapi.ApiException;
 import io.kubernetes.client.openapi.apis.BatchV1Api;
 import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.openapi.models.V1JobList;
-import io.kubernetes.client.openapi.models.V1Namespace;
-import io.kubernetes.client.openapi.models.V1ObjectMeta;
-import io.kubernetes.client.openapi.models.V1PodList;
+import io.kubernetes.client.openapi.models.*;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
@@ -21,6 +19,26 @@ import java.util.stream.Collectors;
 @Slf4j
 public class KubernetesUtil {
 
+
+    /**
+     * 创建 pod
+     *
+     * @param apiClient     api 客户端
+     * @param namespaceName namespace 名称
+     * @return pod 名称列表
+     */
+    @SneakyThrows
+    public static V1Pod createPod(ApiClient apiClient, String namespaceName, V1Pod v1Pod) {
+        CoreV1Api coreV1Api = new CoreV1Api(apiClient);
+        V1Pod result;
+        if (namespaceName == null || "".equals(namespaceName) || "default".equals(namespaceName)) {
+            result = coreV1Api.createNamespacedPod("default", v1Pod, null, null, null, null);
+        } else {
+            result = coreV1Api.createNamespacedPod(namespaceName, v1Pod, null, null, null, null);
+        }
+        return result;
+    }
+
     public static void applyYaml(String hostname, String username, String password, String jobTemplateYamlPathTarget) throws IOException {
         SshClient client = SshUtil.getClient();
         ClientSession session = SshUtil.getSession(client, hostname, username, password);

+ 36 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -1,6 +1,9 @@
 package com.css.simulation.resource.scheduler.util;
 
 import api.common.pojo.constants.DictConstants;
+import api.common.pojo.dto.ProjectMessageDTO;
+import api.common.util.CollectionUtil;
+import api.common.util.JsonUtil;
 import api.common.util.StringUtil;
 import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
 import com.css.simulation.resource.scheduler.mapper.AutoSubProjectMapper;
@@ -10,6 +13,7 @@ import com.css.simulation.resource.scheduler.mapper.UserMapper;
 import com.css.simulation.resource.scheduler.pojo.po.UserPO;
 import com.css.simulation.resource.scheduler.pojo.to.KubernetesNodeTO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.kafka.core.KafkaTemplate;
@@ -41,6 +45,32 @@ public class ProjectUtil {
     @Resource
     StringRedisTemplate stringRedisTemplate;
 
+    /**
+     * 获取正在运行的项目的并行度总和
+     *
+     * @param clusterRunningPrefix 集群 key 前缀
+     * @return 正在运行的项目的并行度总和
+     */
+    @SneakyThrows
+    public int getCurrentParallelismSum(String clusterRunningPrefix) {
+        int result = 0;
+        Set<String> clusterRunningKeySet = stringRedisTemplate.keys(clusterRunningPrefix + "*");
+        List<String> runningProjectSet; // 运行中的 projectId 列表
+        if (CollectionUtil.isEmpty(clusterRunningKeySet)) {
+            return 0;
+        }
+        runningProjectSet = getRunningProjectList(clusterRunningKeySet);
+        if (CollectionUtil.isEmpty(runningProjectSet)) {
+            return 0;
+        }
+        for (String projectKey : runningProjectSet) {
+            String projectJsonTemp = stringRedisTemplate.opsForValue().get(projectKey);
+            ProjectMessageDTO projectMessageTemp = JsonUtil.jsonToBean(projectJsonTemp, ProjectMessageDTO.class);
+            result += projectMessageTemp.getCurrentParallelism();   // 获取当前正在使用的并行度
+        }
+        return result;
+    }
+
 
     /**
      * 根据并行度获取用于执行的节点列表
@@ -185,6 +215,12 @@ public class ProjectUtil {
     }
 
 
+    /**
+     * 获取 projectId 列表
+     *
+     * @param clusterRunningKeySet 集群下的所有键值对(包括运行中的项目和等待中的项目)
+     * @return projectId 列表
+     */
     public List<String> getRunningProjectList(Set<String> clusterRunningKeySet) {
         return clusterRunningKeySet.stream().filter(key -> StringUtil.countSubString(key, ":") == 3).collect(Collectors.toList());
     }