martin 3 лет назад
Родитель
Сommit
f514bf5d70

+ 49 - 3
api-common/src/main/java/api/common/util/FileUtil.java

@@ -1,18 +1,44 @@
 package api.common.util;
 
 import org.springframework.util.ResourceUtils;
-import org.springframework.web.multipart.MultipartFile;
 
 import javax.servlet.http.HttpServletResponse;
 import java.io.*;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.*;
 
 public class FileUtil {
 
+
+    public static String read(String path) throws Exception {
+        return read(getFile(path));
+    }
+
+    public static String cat(String path) throws Exception {
+        return read(getFile(path));
+    }
+
+    public static String read(File file) throws IOException {
+        return read(new FileInputStream(file));
+    }
+
+    public static String read(InputStream inputStream) throws IOException {
+        StringBuilder result = new StringBuilder();
+        byte[] buf = new byte[4096];//创建字节数组,存储临时读取的数据
+        int len;//记录数据读取的长度
+        //循环读取数据
+        while ((len = inputStream.read(buf)) != -1) { //长度为-1则读取完毕
+            result.append(new String(buf, 0, len)).append("\n");
+        }
+        inputStream.close();
+        return result.toString();
+    }
+
+
     // -------------------------------- A --------------------------------
 
     /**
@@ -676,13 +702,34 @@ public class FileUtil {
     // -------------------------------- V --------------------------------
     // -------------------------------- W --------------------------------
 
+    /**
+     * 将字符串保存为本地文件
+     */
+    public static void writeStringToLocalFile(String string, String filePath) throws IOException {
+        writeInputStreamToLocalFile(new ByteArrayInputStream(string.getBytes(StandardCharsets.UTF_8)), filePath);
+    }
+
+    /**
+     * 将字符串保存为本地文件
+     */
+    public static void writeStringToLocalFile(String string, String filePath, int bufferLength) throws IOException {
+        writeInputStreamToLocalFile(new ByteArrayInputStream(string.getBytes(StandardCharsets.UTF_8)), filePath, bufferLength);
+    }
+
 
     /**
      * 将输入流保存为本地文件
      */
     public static void writeInputStreamToLocalFile(InputStream inputStream, String filePath) throws IOException {
+        writeInputStreamToLocalFile(inputStream, filePath, 4096);
+    }
+
+    /**
+     * 将输入流保存为本地文件
+     */
+    public static void writeInputStreamToLocalFile(InputStream inputStream, String filePath, int bufferLength) throws IOException {
         BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
-        byte[] data = new byte[1024];
+        byte[] data = new byte[bufferLength];
         int dataLength;
         File file = new File(filePath);
         createParentDirectory(file);
@@ -713,5 +760,4 @@ public class FileUtil {
     }
 
 
-
 }

+ 6 - 6
pom.xml

@@ -135,12 +135,12 @@
             <!-- 权限认证 - 结束 -->
 
 
-            <!-- api 文档 -->
-            <dependency>
-                <groupId>io.github.yedaxia</groupId>
-                <artifactId>japidocs</artifactId>
-                <version>${japidocs.version}</version>
-            </dependency>
+<!--            &lt;!&ndash; api 文档 &ndash;&gt;-->
+<!--            <dependency>-->
+<!--                <groupId>io.github.yedaxia</groupId>-->
+<!--                <artifactId>japidocs</artifactId>-->
+<!--                <version>${japidocs.version}</version>-->
+<!--            </dependency>-->
 
             <!-- 数据库 - 开始 -->
             <!-- page helper -->

+ 7 - 8
simulation-resource-monitor/src/main/java/com/css/simulation/resource/monitor/scheduler/Statistic.java

@@ -1,18 +1,17 @@
 package com.css.simulation.resource.monitor.scheduler;
 
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 @Component
 public class Statistic {
 
-    /**
-     * 内存使用率统计。
-     */
-    @Scheduled(fixedDelay = 60 * 60 * 1000)
-    public void df() {
-        diskFreeService.getDiskFree();
-    }
+//    /**
+//     * 内存使用率统计。
+//     */
+//    @Scheduled(fixedDelay = 60 * 60 * 1000)
+//    public void df() {
+//        diskFreeService.getDiskFree();
+//    }
 
 
 }

+ 38 - 55
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -7,11 +7,7 @@ import api.common.util.*;
 import com.css.simulation.resource.scheduler.mapper.*;
 import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.pojo.to.*;
-import com.css.simulation.resource.scheduler.util.MinioUtil;
 import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.openapi.apis.BatchV1Api;
-import io.kubernetes.client.openapi.models.V1Job;
-import io.kubernetes.client.util.Yaml;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -23,7 +19,6 @@ import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -208,7 +203,8 @@ public class ManualProjectConsumer {
             //4-4 将对象转成 json
             String taskJson = JsonUtil.beanToJson(taskTO);
             //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
-            kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
+//            kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
+            kafkaTemplate.send("test", taskJson).addCallback(success -> {
                 // 消息发送到的topic
                 String topic = success.getRecordMetadata().topic();
                 // 消息发送到的分区
@@ -229,60 +225,47 @@ public class ManualProjectConsumer {
         // 私有仓库导入算法镜像
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
         //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
-        AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
-        if (algorithmPO == null){
-            // 访问索为远程接口
-        }
-        String minioPath = algorithmPO.getMinioPath();
-        String dockerImage;
-        if ("0".equals(algorithmPO.getDockerImport())) {
-            dockerImage = "algorithm_" + algorithmId + ":latest";
-            String algorithmTarLinuxTempPath = linuxTempPath + minioPath;
-            // 下载算法文件到本地( 2 到仓库服务器)
-            MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);
-            //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
-            LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
-        } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
-            dockerImage = algorithmPO.getDockerImage();
-        } else {
-            throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
-        }
+//        AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
+//        if (algorithmPO == null){
+//            // 访问索为远程接口
+//        }
+//        String minioPath = algorithmPO.getMinioPath();
+//        String dockerImage;
+//        if ("0".equals(algorithmPO.getDockerImport())) {
+//            dockerImage = "algorithm_" + algorithmId + ":latest";
+//            String algorithmTarLinuxTempPath = linuxTempPath + minioPath;
+//            // 下载算法文件到本地( 2 到仓库服务器)
+//            MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);
+//            //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
+//            LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
+//        } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
+//            dockerImage = algorithmPO.getDockerImage();
+//        } else {
+//            throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
+//        }
         // -------------------------------- 5 创建 pod 开始执行 --------------------------------
         int completions = sceneList.size();     // 结束标
         int parallelism = projectMessageDTO.getParallelism();    // 并行度
         log.info("------- ManualProjectConsumer 项目 " + projectId + " 的完成度为:" + completions);
         log.info("------- ManualProjectConsumer 项目 " + projectId + " 的并行度为:" + parallelism);
-        BatchV1Api batchV1Api = new BatchV1Api(apiClient);
-        V1Job yaml = (V1Job) Yaml.load(new File("/opt/simulation-cloud/simulation-resource-scheduler/conf/job-template.yaml"));
-//        V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-test.yaml"));
-//        //1 apiVersion
-//        //2 kind
-//        //3 metadata
-//        V1ObjectMeta metadata = yaml.getMetadata();
-//        metadata.setName("project_" + projectId);
-//        yaml.setMetadata(metadata);
-//        //4 job
-//        V1JobSpec job = yaml.getSpec();
-//        job.setCompletions(completions); // 这个标准是什么?
-//        job.setParallelism(parallelism);
-//        //5 pod
-//        V1PodSpec v1PodSpec = job.getTemplate().getSpec();
-//        //6 container
-//        List<V1Container> containers = v1PodSpec.getContainers();
-//        for (V1Container container : containers) {
-//            String name = container.getName();
-//            if ("vtd".equals(name)) {
-//                container.setName("vtd_" + projectId);
-//            }
-//            if ("algorithm".equals(name)) {
-//                container.setName("algorithm_" + projectId);
-////                container.setImage(dockerImage);
-//            }
-//        }
-//        //4-4 创建
-//        yaml.setSpec(job);
-        log.info("------- ManualProjectConsumer 创建 job:" + yaml);
-        batchV1Api.createNamespacedJob("simulation-cloud", yaml, null, null, null);
+        String jobTemplateYamlPathSource = "/opt/simulation-cloud/simulation-resource-scheduler/job-template/job-template.yaml";
+//        String jobTemplateYamlPathSource = "D:\\temp\\job-template.yaml";
+        String jobTemplateYamlPathTarget = "/opt/simulation-cloud/simulation-resource-scheduler/job-yaml/" + "project-" + projectId + ".yaml";
+        String yamlSource = FileUtil.read(jobTemplateYamlPathSource);
+        log.info("------- ManualProjectConsumer 模板文件为:" + yamlSource);
+        String replace0 = yamlSource.replace("job-cloud-simulation", "project-" + projectId);
+        String replace1 = replace0.replace("vtd-container", "vtd-" + projectId);
+        String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
+        int i = replace2.indexOf("completions:");
+        int j = replace2.indexOf("parallelism:");
+        StringBuilder stringBuilder = new StringBuilder(replace2);
+        stringBuilder.replace(i + "completions: ".length(), i + "completions: ".length() + 1, completions + "");
+        stringBuilder.replace(j + "parallelism: ".length(), j + "parallelism: ".length() + 1, parallelism + "");
+        String yamlTarget0 = stringBuilder.toString();
+        String yamlTarget1 = yamlTarget0.replace("apiVers1on", "apiVersion");
+        log.info("------- ManualProjectConsumer 开始执行 yaml 文件" + yamlTarget1);
+        FileUtil.writeStringToLocalFile(yamlTarget1, jobTemplateYamlPathTarget);
+        LinuxUtil.execute("kubectl apply -f " + jobTemplateYamlPathTarget);
     }
 
 

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/ProjectMapper.java

@@ -39,6 +39,6 @@ public interface ProjectMapper {
     @Select("select count(1)\n" +
             "from simulation_manual_project_task\n" +
             "where run_state in ('Aborted', 'PendingAnalysis', 'Terminated')\n" +
-            "  and p_id in #{projectId}")
+            "  and p_id = #{projectId}")
     int selectEndTaskNum(@Param("projectId") String projectId);
 }

+ 24 - 11
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -1,6 +1,7 @@
 package com.css.simulation.resource.scheduler.scheduler;
 
 import api.common.pojo.constants.DictConstants;
+import api.common.util.CollectionUtil;
 import api.common.util.TimeUtil;
 import com.css.simulation.resource.scheduler.mapper.TaskMapper;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
@@ -12,6 +13,7 @@ import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
+import java.util.Optional;
 
 @Component
 @Slf4j
@@ -27,23 +29,34 @@ public class TickScheduler {
 
     @Scheduled(fixedDelay = 2000)
     public void tick() {
-        log.info("------- TickScheduler 查询出所有执行中的任务('Running', 'Analysis', 'Analysing')");
+
         List<TaskPO> executingTaskList = taskMapper.selectExecuting();
+        if (CollectionUtil.isEmpty(executingTaskList)) {
+            return;
+        }
+        log.info("------- TickScheduler 查询出所有执行中的任务('Running', 'Analysis', 'Analysing'):" + executingTaskList);
         //2 根据 key 查出任务的心跳时间
         executingTaskList.forEach(task -> {
             String taskId = task.getId();
             String projectId = task.getPId();
-            long tickTime = Long.parseLong(redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId));
-            long maxSimulationTime = task.getMaxSimulationTime() * 1000;
-            long now = TimeUtil.getNow();
-            long difference = TimeUtil.getNow() - tickTime;
-            log.info("------- TickScheduler 任务" + taskId + "心跳时间为:" + tickTime+ "最大仿真时间为:" + tickTime+ "时间差为:" + tickTime);
-            log.info("------- TickScheduler 任务" + taskId );
-            log.info("------- TickScheduler 任务" + taskId );
-            if (TimeUtil.getNow() - tickTime > maxSimulationTime) {
-            log.info("------- TickScheduler 任务" + taskId + "已超时,状态修改为:"+DictConstants.TASK_ABORTED);
-                taskMapper.updateState(taskId, DictConstants.TASK_ABORTED);
+            try {
+                String s = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId);
+                Optional.ofNullable(s).orElseThrow(() -> new RuntimeException("项目 " + projectId + " 下的任务 " + taskId + " 的心跳查询失败"));
+                long tickTime = Long.parseLong(s);
+                long maxSimulationTime = task.getMaxSimulationTime() * 1000;
+                long now = TimeUtil.getNow();
+                long difference = now - tickTime;
+                log.info("------- TickScheduler 任务" + taskId + "心跳时间为:" + tickTime + "最大仿真时间为:" + tickTime + "时间差为:" + tickTime);
+                log.info("------- TickScheduler 任务" + taskId);
+                log.info("------- TickScheduler 任务" + taskId);
+                if (difference > maxSimulationTime) {
+                    log.info("------- TickScheduler 任务" + taskId + "已超时,状态修改为:" + DictConstants.TASK_ABORTED);
+                    taskMapper.updateState(taskId, DictConstants.TASK_ABORTED);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e.getMessage());
             }
+
         });
 
     }

+ 5 - 4
simulation-resource-scheduler/src/main/resources/kubernetes/template/job/job-template.yaml

@@ -7,12 +7,14 @@ metadata:
   labels:
     user: EY
 spec:
+  completions: 1
+  parallelism: 1
   template:
     metadata:
       name: pod-cloud-simulation
     spec:
       containers:
-        - name: vtd
+        - name: vtd-container
           image: vtd.run.perception:latest
           imagePullPolicy: Never
           command: [ "/Controller/VTDController", "/Controller/config/docker_cloud.ini" ]
@@ -26,7 +28,7 @@ spec:
               mountPath: /dev/nvidiactl
           securityContext:
             privileged: true
-        - name: algorithm
+        - name: algorithm-container
           image: aeb.ros:latest
           imagePullPolicy: Never
           command: [ "/AEB/start_docker.sh" ]
@@ -37,5 +39,4 @@ spec:
             path: /dev/nvidia0
         - name: nvidiactl
           hostPath:
-            path: /dev/nvidiactl
-
+            path: /dev/nvidiactl