Jelajahi Sumber

创建pod时添加分区和偏移量

root 2 tahun lalu
induk
melakukan
daa2acb511

+ 2 - 1
api-common/src/main/java/api/common/util/FileUtil.java

@@ -75,8 +75,9 @@ public class FileUtil {
         return read(getFile(path));
     }
 
+    @SneakyThrows
     public static String read(File file) throws IOException {
-        return read(new FileInputStream(file));
+        return read(Files.newInputStream(file.toPath()));
     }
 
     public static String read(InputStream inputStream) throws IOException {

+ 15 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/TaskController.java

@@ -31,6 +31,21 @@ public class TaskController {
         taskService.taskState(taskId, state, podName);
     }
 
+    /**
+     * 修改任务状态
+     */
+    @GetMapping("/state")
+    public void taskState(
+            @RequestParam("taskId") String taskId,
+            @RequestParam("state") String state,
+            @RequestParam("podName") String podName,
+            @RequestParam("partition") String partition,
+            @RequestParam("offset") String offset
+
+    ) {
+        taskService.taskState(taskId, state, podName);
+    }
+
     /**
      * Pod 的心跳接口
      */

+ 52 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/ProjectManager.java

@@ -0,0 +1,52 @@
+package com.css.simulation.resource.scheduler.manager;
+
+import api.common.util.FileUtil;
+import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
+import com.css.simulation.resource.scheduler.util.ProjectUtil;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.io.File;
+
+@Component
+@Slf4j
+public class ProjectManager {
+
+    @Value("${scheduler.linux-path.pod-template-yaml}")
+    String podTemplateYaml;
+    @Value("${scheduler.linux-path.pod-yaml-directory}")
+    String podYamlDirectory;
+    @Resource
+    KubernetesConfiguration kubernetesConfiguration;
+    @Resource
+    ProjectUtil projectUtil;
+
+
+    /**
+     * 创建一个临时 yaml,node 在最后用 # 号隔开
+     *
+     * @param projectId
+     * @param algorithmDockerImage
+     */
+    @SneakyThrows
+    public void createTempYaml(String projectId, String algorithmDockerImage, String nodeName) {
+        String podName = projectUtil.getRandomPodName(projectId);   // 生成 podName
+        String podTemplateFileNameOfProject = projectUtil.getPodYamlName(podName, nodeName);     // 模板文件名称
+        String podString = FileUtil.read(new File(podTemplateYaml));
+        String replace0 = podString.replace("vtd-container", "vtd-" + projectId);
+        String replace1 = replace0.replace("vtd-image", kubernetesConfiguration.getVtdImage());
+        String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
+        String replace3 = replace2.replace("algorithm-image", algorithmDockerImage);
+        String replace4 = replace3.replace("kafkaTopic", projectId);     // 消息主题名称为 projectId
+        String replace5 = replace4.replace("pod-name", podName); // pod 名称包括 projectId 和 随机字符串
+        String replace6 = replace5.replace("namespace-name", kubernetesConfiguration.getNamespace()); // pod 名称包括 projectId 和 随机字符串
+        String replace7 = replace6.replace("node-name", nodeName);     // 指定 pod 运行节点
+//        log.info("ProjectService--createPod 在节点 " + nodeName + " 开始执行 pod:" + tempPodString);
+        FileUtil.writeStringToLocalFile(replace7, podYamlDirectory + podTemplateFileNameOfProject);
+//        log.info("ProjectService--createPod 在节点 " + nodeName + " 开始执行 pod。");
+//        projectUtil.createPod(nodeName, podName, tempPodString);
+    }
+}

+ 5 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -304,6 +304,11 @@ public class ProjectService {
                     // 消息在分区内的offset
                     long offset = success.getRecordMetadata().offset();
                     log.info("------- ProjectConsumer 发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset + " 消息体为:" + finalTaskJson);
+                    //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
+
+
+
+
                 }, failure -> log.error("------- 发送消息失败:" + failure.getMessage()));
                 messageNumber[0] = messageNumber[0] + 1;
             });

+ 7 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -74,6 +74,10 @@ public class ProjectUtil {
         return "project-" + projectId + "-" + StringUtil.getRandomEightBitUUID();
     }
 
+    public String getPodYamlName(String podName, String nodeName) {
+        return nodeName + "#" + podName + ".yaml";
+    }
+
     @SneakyThrows
     public void deletePod(String podName) {
         String key = "pod:" + podName + ":node";
@@ -117,7 +121,7 @@ public class ProjectUtil {
     @SneakyThrows
     public void createPod(String nodeName, String podName, String podYamlContent) {
         stringRedisTemplate.opsForValue().set("pod:" + podName + ":node", nodeName);    // 将 pod 运行在哪个 node 上记录到 redis
-        String podYamlName = podName + ".yaml";     // 模板文件名称
+        String podYamlName = getNodeNameOfPod(podName);
         String podYamlPath = podYamlDirectory + podYamlName;
         FileUtil.writeStringToLocalFile(podYamlContent, podYamlPath);
         KubernetesUtil.createNs(apiClient, kubernetesConfiguration.getNamespace());
@@ -127,6 +131,8 @@ public class ProjectUtil {
 //        KubernetesUtil.createPod(apiClient, kubernetesNamespace, v1Pod);
     }
 
+
+
     /**
      * 更改一个名字继续启动
      *