martin 3 лет назад
Родитель
Сommit
2a77fa1891

+ 54 - 0
api-common/src/main/java/api/common/util/FileUtil.java

@@ -1,5 +1,8 @@
 package api.common.util;
 
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import org.springframework.util.ResourceUtils;
 
 import javax.servlet.http.HttpServletResponse;
@@ -300,6 +303,57 @@ public class FileUtil {
         copyBytes(in, response.getOutputStream(), bufferSize);
     }
 
+    public static Set<String> decompress(InputStream inputStream, String targetDirectoryPath, String fileType) throws IOException {
+        Set<String> fileList = new HashSet<>();
+        if (!targetDirectoryPath.endsWith("/")) {
+            targetDirectoryPath += "/";
+        }
+        if (".tar".equals(fileType)) {
+            TarArchiveInputStream tarArchiveInputStream = new TarArchiveInputStream(inputStream);
+            TarArchiveEntry entry;
+            // 将 tar 文件解压到 extractPath 目录下
+            while ((entry = tarArchiveInputStream.getNextTarEntry()) != null) {
+                if (entry.isDirectory()) {
+                    continue;
+                }
+                createDirectory(targetDirectoryPath);
+                File currentFile = new File(targetDirectoryPath + entry.getName());
+                File parent = currentFile.getParentFile();
+                if (!parent.exists()) {
+                    boolean mkdirs = parent.mkdirs();
+                }
+                fileList.add(currentFile.getAbsolutePath());
+                // 将文件写出到解压的目录
+                copyBytes(tarArchiveInputStream, new FileOutputStream(currentFile), 4096);
+            }
+
+
+        }
+        if (".tar.gz".equals(fileType)|| ".tgz".equals(fileType)) {
+            TarArchiveInputStream tarArchiveInputStream = new TarArchiveInputStream((new GzipCompressorInputStream(inputStream)));
+            TarArchiveEntry entry;
+            // 将 tar 文件解压到 extractPath 目录下
+            while ((entry = tarArchiveInputStream.getNextTarEntry()) != null) {
+                if (entry.isDirectory()) {
+                    continue;
+                }
+                createDirectory(targetDirectoryPath);
+                File currentFile = new File(targetDirectoryPath + entry.getName());
+                File parent = currentFile.getParentFile();
+                if (!parent.exists()) {
+                    boolean mkdirs = parent.mkdirs();
+                }
+                fileList.add(currentFile.getAbsolutePath());
+                // 将文件写出到解压的目录
+                copyBytes(tarArchiveInputStream, new FileOutputStream(currentFile), 4096);
+            }
+
+
+        }
+        inputStream.close();
+        return fileList;
+    }
+
     // -------------------------------- F --------------------------------
     // -------------------------------- G --------------------------------
 

+ 4 - 2
api-common/src/main/java/api/common/util/SshUtil.java

@@ -3,6 +3,7 @@ package api.common.util;
 
 import api.common.pojo.dto.GpuDTO;
 import api.common.pojo.dto.ProcessDTO;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.channel.ChannelExec;
 import org.apache.sshd.client.channel.ClientChannelEvent;
@@ -27,6 +28,7 @@ import java.util.*;
  * <version>${sshd-core.version}</version>
  * </dependency>
  */
+@Slf4j
 public class SshUtil {
 
     private static final int port = 22;
@@ -75,6 +77,7 @@ public class SshUtil {
      * @throws IOException 异常
      */
     public static String execute(ClientSession session, String command) throws IOException {
+        log.info("SshUtil--execute 通过远程链接执行命令:" + command);
         String result;
         ChannelExec execChannel = session.createExecChannel(command);
         // 创建输出流
@@ -94,11 +97,10 @@ public class SshUtil {
      * 获取主机名
      */
     public static String hostname(ClientSession session) throws IOException {
-        return SshUtil.execute(session, "hostname").replace("\n","");
+        return SshUtil.execute(session, "hostname").replace("\n", "");
     }
 
 
-
     /**
      * 获取 cpu 使用率
      */

Разница между файлами не показана из-за своего большого размера
+ 1 - 292
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java


+ 28 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/AlgorithmController.java

@@ -0,0 +1,28 @@
+package com.css.simulation.resource.scheduler.controller;
+
+
+import api.common.pojo.common.ResponseBodyVO;
+import com.css.simulation.resource.scheduler.service.AlgorithmService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RefreshScope
+@RestController
+@RequestMapping("/algorithm")
+public class AlgorithmController {
+
+    @Autowired
+    AlgorithmService algorithmService;
+
+
+    @RequestMapping("/check")
+    public ResponseBodyVO<String> check(
+            @RequestParam("minioPath") String minioPath
+    ) {
+        return algorithmService.check(minioPath);
+    }
+
+}

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskMapper.java

@@ -56,7 +56,7 @@ public interface TaskMapper {
     void updateSuccessStateWithStopTime(@Param("id") String id, @Param("runState") String runState, @Param("runStopTime") Timestamp runStopTime);
 
     @Update("update simulation_manual_project_task\n" +
-            "set run_state = #{runState},run_end_time = #{runStopTime},run_result='Failed'\n" +
+            "set run_state = #{runState},run_end_time = #{runStopTime},run_result='Failed',score = 0\n" +
             "where id = #{id}" +
             " and score is null")
     void updateFailStateWithStopTime(@Param("id") String id, @Param("runState") String runState, @Param("runStopTime") Timestamp runStopTime);

+ 9 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -38,11 +38,18 @@ public class TickScheduler {
     @Value("${scheduler.manual-project.job-yaml}")
     String jobYaml;
 
+    @Value("${scheduler.score.hostname}")
+    String hostnameScore;
+    @Value("${scheduler.score.username}")
+    String usernameScore;
+    @Value("${scheduler.score.password}")
+    String passwordScore;
+
     @Scheduled(fixedDelay = 60 * 1000)
     public void tick() throws IOException {
 
         SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, "182.92.203.182", "root", "CICV2022test");
+        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
 
         ArrayList<TaskPO> executingTaskList = taskMapper.selectExecuting();
         if (CollectionUtil.isEmpty(executingTaskList)) {
@@ -84,7 +91,7 @@ public class TickScheduler {
     @Scheduled(fixedDelay = 30 * 1000)
     public void checkProject() throws IOException {
         SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, "182.92.203.182", "root", "CICV2022test");
+        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
 
         //1 查询出正在运行中的 project
         List<String> projectIdList = projectMapper.selectIdByState("20");

+ 54 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/AlgorithmService.java

@@ -0,0 +1,54 @@
+package com.css.simulation.resource.scheduler.service;
+
+import api.common.pojo.common.ResponseBodyVO;
+import api.common.util.FileUtil;
+import com.css.simulation.resource.scheduler.mapper.AlgorithmMapper;
+import com.css.simulation.resource.scheduler.util.MinioUtil;
+import io.minio.MinioClient;
+import lombok.SneakyThrows;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.io.InputStream;
+import java.util.Set;
+
+@Service
+public class AlgorithmService {
+
+    @Autowired
+    AlgorithmMapper algorithmMapper;
+    @Autowired
+    MinioClient minioClient;
+    @Value("${minio.bucket-name}")
+    String bucketName;
+    @Value("${scheduler.linux-temp-path}")
+    String linuxTempPath;
+
+    @SneakyThrows
+    public ResponseBodyVO<String> check(String minioPath) {
+
+
+        //1 根据 minio 路径获取输入流
+        InputStream inputStream = MinioUtil.downloadToStream(minioClient, bucketName, minioPath);
+        //2 将输入流解压到临时目录
+        String algorithmTarLinuxTempPath = linuxTempPath +  minioPath;
+        Set<String> pathList = FileUtil.decompress(inputStream, algorithmTarLinuxTempPath, ".tgz");
+        //3 获取文件列表中是否又 docker-entrypoint.sh
+        boolean result = false;
+        for (String path : pathList) {
+            if (path.contains("docker-entrypoint.sh")) {
+                result = true;
+                break;
+            }
+        }
+
+        FileUtil.rm(algorithmTarLinuxTempPath);
+        if (result) {
+            return new ResponseBodyVO<>(ResponseBodyVO.Response.SUCCESS);
+        } else {
+            return new ResponseBodyVO<>(ResponseBodyVO.Response.CLIENT_FAILURE);
+        }
+
+    }
+}

+ 4 - 7
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -111,7 +111,7 @@ public class TaskService {
         redisTemplate.opsForValue().set("podName:" + taskId, podName);
         String podDeleteCommand = "kubectl delete pod " + podName;
         SshClient client = SshUtil.getClient();
-        ClientSession session = SshUtil.getSession(client, "182.92.203.182", "root", "CICV2022test");
+        ClientSession session = SshUtil.getSession(client, hostnameScore, usernameScore, passwordScore);
         if ("Running".equals(state)) {
             log.info("TaskService--state 修改任务 " + taskId + "的状态为 Running,pod 名称为:" + podName);
             taskMapper.updateStateWithStartTime(taskId, state, TimeUtil.getNowForMysql());
@@ -279,18 +279,15 @@ public class TaskService {
 
             // 计算不合格的任务数(不到100分就是不合格,执行失败的不算)
             long notStandardSceneNumber = taskListOfLeafIndex.stream()
-                    .filter(task -> task.getScore() < 100 && task.getScore() != 0.0 && DictConstants.TASK_COMPLETED.equals(task.getRunState()))
+                            .filter(task -> task.getScore() < 100)
                     .count();
 
             // 计算总分
             double leafSum = taskListOfLeafIndex.stream()
-                    .filter(task -> DictConstants.TASK_COMPLETED.equals(task.getRunState()))
                     .mapToDouble(TaskPO::getScore)
                     .sum();
-            // 计算成功执行的个数
-            long resultNumberOfCurrentIndex = taskListOfLeafIndex.stream()
-                    .filter(task -> DictConstants.TASK_COMPLETED.equals(task.getRunState()))
-                    .count();
+            // 计算任务的个数
+            long resultNumberOfCurrentIndex = taskListOfLeafIndex.size();
             log.info("TaskService--state 项目 " + projectId + " 的叶子指标" + indexId + "下成功执行的场景数量为:" + resultNumberOfCurrentIndex);
             double leafIndexScore = resultNumberOfCurrentIndex == 0 ? 0 : NumberUtil.cut(leafSum / resultNumberOfCurrentIndex, 2);
             // -------------------------------- 保存叶子指标得分 --------------------------------

Некоторые файлы не были показаны из-за большого количества измененных файлов