martin před 3 roky
rodič
revize
db15d1b53e

+ 9 - 0
api-common/pom.xml

@@ -15,10 +15,19 @@
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
         <jackson-core.version>2.13.1</jackson-core.version>
+        <sshd-netty.version>2.8.0</sshd-netty.version>
     </properties>
 
     <dependencies>
 
+
+
+        <dependency>
+            <groupId>org.apache.sshd</groupId>
+            <artifactId>sshd-netty</artifactId>
+            <version>${sshd-netty.version}</version>
+        </dependency>
+
         <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>

+ 2 - 0
api-common/src/main/java/api/common/pojo/param/MinioParameter.java

@@ -1,12 +1,14 @@
 package api.common.pojo.param;
 
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 import javax.validation.constraints.NotBlank;
 
 @Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class MinioParameter {

+ 65 - 0
api-common/src/main/java/api/common/util/SshUtil.java

@@ -0,0 +1,65 @@
+package api.common.util;
+
+
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.channel.ChannelExec;
+import org.apache.sshd.client.channel.ClientChannelEvent;
+import org.apache.sshd.client.future.AuthFuture;
+import org.apache.sshd.client.session.ClientSession;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * https://github.com/apache/mina-sshd
+ * https://github.com/apache/mina-sshd/blob/master/docs/client-setup.md
+ * <!-- ssh -->
+ * <dependency>
+ * <groupId>org.apache.sshd</groupId>
+ * <artifactId>sshd-core</artifactId>
+ * <version>${sshd-core.version}</version>
+ * </dependency>
+ */
+public class SshUtil {
+
+    private static final int port = 22;
+
+    /**
+     *
+     * @param hostname  主机名或 IP
+     * @param username  用户名
+     * @param password  密码
+     * @param command   命令
+     * @return  执行结果
+     * @throws IOException  异常
+     */
+    public static String execute(String hostname, String username, String password, String command) throws IOException {
+        String result;
+
+        SshClient client = SshClient.setUpDefaultClient();
+        client.start();
+        // using the client for multiple sessions...
+        try (ClientSession session = client.connect(username, hostname, port).verify(10000).getSession()) {
+            session.addPasswordIdentity(password); // for password-based authentication
+            AuthFuture verify = session.auth().verify(10000);
+            if (verify.isFailure()) {
+                throw new RuntimeException("------- SSH 用户名密码验证失败!");
+            }
+            ChannelExec execChannel = session.createExecChannel(command);
+            // 创建输出流
+            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(2048);
+            execChannel.setOut(byteArrayOutputStream);
+            execChannel.open();
+            execChannel.waitFor(Collections.singleton(ClientChannelEvent.CLOSED), 0);
+            execChannel.close();
+            // 结果写入
+            result = byteArrayOutputStream.toString();
+            byteArrayOutputStream.close();
+        }
+        client.stop();
+        return result;
+    }
+
+
+}

+ 1 - 7
pom.xml

@@ -55,6 +55,7 @@
         <okhttp.version>4.9.3</okhttp.version>
         <kubernetes.version>14.0.0</kubernetes.version>
         <docker-java.version>3.2.13</docker-java.version>
+
     </properties>
 
 
@@ -71,7 +72,6 @@
                 <version>${logback-classic.version}</version>
             </dependency>
 
-
             <!-- docker 客户端 -->
             <dependency>
                 <groupId>com.github.docker-java</groupId>
@@ -79,12 +79,6 @@
                 <version>${docker-java.version}</version>
             </dependency>
 
-            <!-- ssh  -->
-            <dependency>
-                <groupId>ch.ethz.ganymed</groupId>
-                <artifactId>ganymed-ssh2</artifactId>
-                <version>${ganymed-ssh2.version}</version>
-            </dependency>
 
             <!-- kubernetes 客户端 -->
             <dependency>

+ 8 - 8
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -52,17 +52,17 @@ public class ManualProjectConsumer {
     @Autowired
     SensorOgtMapper sensorOgtMapper;
     @Autowired
-    private CommonService commonService;
+     CommonService commonService;
     @Autowired
-    private AlgorithmMapper algorithmMapper;
+     AlgorithmMapper algorithmMapper;
     @Autowired
-    private ApiClient apiClient;
+     ApiClient apiClient;
     @Value("${spring.kafka.consumer.topic.manual-project}")
-    private String manualProjectTopic;
+     String manualProjectTopic;
     @Value("${scheduler.manual-project.result-path-minio}")
-    private String manualProjectResultPath;
-    @Value("${scheduler.temp-path-linux}")
-    private String tempPath;
+     String manualProjectResultPath;
+    @Value("${scheduler.manual-project.algorithm-tar-path-linux}")
+     String algorithmTarPathLinux;
 
 
     @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${spring.kafka.consumer.topic.manual-project}")
@@ -194,7 +194,7 @@ public class ManualProjectConsumer {
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
         //4-1 根据算法 id 获取算法文件地址
         String minioPath = algorithmMapper.selectMinioPathById(algorithmId);
-        String linuxPath = tempPath + minioPath.substring(1);
+        String linuxPath = algorithmTarPathLinux + minioPath.substring(1);
         // 下载算法文件到本地( 2 到仓库服务器)
         Response response = commonService.download(new MinioParameter(minioPath));
         InputStream inputStream = response.body().asInputStream();

+ 11 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskMapper.java

@@ -6,7 +6,6 @@ import org.apache.ibatis.annotations.*;
 import org.apache.ibatis.type.JdbcType;
 import org.springframework.web.bind.annotation.RequestParam;
 
-import java.sql.Timestamp;
 import java.util.List;
 
 /**
@@ -22,7 +21,8 @@ public interface TaskMapper {
             @Result(column = "scene_name", property = "sceneName", jdbcType = JdbcType.VARCHAR),
             @Result(column = "scene_type", property = "sceneType", jdbcType = JdbcType.VARCHAR),
             @Result(column = "run_state", property = "runState", jdbcType = JdbcType.VARCHAR),
-            @Result(column = "run_result", property = "runResult", jdbcType = JdbcType.VARCHAR)
+            @Result(column = "run_result", property = "runResult", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "max_simulation_time", property = "maxSimulationTime", jdbcType = JdbcType.VARCHAR),
     })
     @Select("select id,\n" +
             "       p_id,\n" +
@@ -64,4 +64,13 @@ public interface TaskMapper {
             "    modify_user_id  = #{task.modifyUserId},\n" +
             "    modify_time     = #{task.modifyTime}")
     void updateByScoreResult(@Param("task") TaskPO task);
+
+    @ResultMap("task")
+    @Select("select smpt.id, smpt.p_id, smp.max_simulation_time\n" +
+            "from simulation_manual_project_task smpt\n" +
+            "         left join simulation_manual_project smp on smpt.p_id = smp.id\n" +
+            "where smpt.is_deleted = '0'\n" +
+            "  and smp.is_deleted = '0'\n" +
+            "  and smpt.run_state in ('Running', 'Analysis', 'Analysing')")
+    List<TaskPO> selectExecuting();
 }

+ 1 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/po/TaskPO.java

@@ -20,4 +20,5 @@ public class TaskPO extends CommonPO {
     private Double score;
     private String targetEvaluate;
     private String scoreExplain;
+    private Long maxSimulationTime;
 }

+ 4 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/to/ScoreTO.java

@@ -10,8 +10,8 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 @AllArgsConstructor
 public class ScoreTO {
-    private String unitSceneID;
-    private Double unitSceneScore;
-    private String evaluateItem;
-    private String scoreDescription;
+    private String unit_scene_ID;
+    private Double unit_scene_score;
+    private String evaluate_item;
+    private String score_description;
 }

+ 36 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/TickScheduler.java

@@ -1,7 +1,43 @@
 package com.css.simulation.resource.scheduler.scheduler;
 
+import api.common.pojo.constants.DictConstants;
+import api.common.pojo.param.RedisParameter;
+import api.common.util.TimeUtil;
+import com.css.simulation.resource.scheduler.feign.CommonService;
+import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+
 @Component
 public class TickScheduler {
+
+    @Value("${spring.kafka.consumer.topic.manual-project}")
+    private String manualProjectTopic;
+    @Autowired
+    private CommonService commonService;
+
+    @Autowired
+    private TaskMapper taskMapper;
+
+    public void tick() {
+        //1 查询出所有执行中的任务(除了 等待中 和 已完成)
+        List<TaskPO> executingTaskList = taskMapper.selectExecuting();
+        //2 根据 key 查出任务的心跳时间
+        executingTaskList.forEach(task -> {
+            String taskId = task.getId();
+            String projectId = task.getPId();
+            Long maxSimulationTime = task.getMaxSimulationTime();
+            long tickTime = Long.parseLong(commonService.get(RedisParameter.builder().key(manualProjectTopic + ":" + projectId + ":" + taskId).build()).getInfo());
+            if (TimeUtil.getNow()-tickTime > maxSimulationTime){
+                //3 判断如果心跳时间距离当前时间已经超时,则修改任务状态为失败,并销毁 pod
+                taskMapper.updateState(taskId, DictConstants.TASK_ABORTED);
+
+            }
+        });
+
+    }
 }

+ 33 - 10
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -1,10 +1,9 @@
 package com.css.simulation.resource.scheduler.service;
 
 import api.common.pojo.constants.DictConstants;
+import api.common.pojo.param.MinioParameter;
 import api.common.pojo.param.RedisParameter;
-import api.common.util.CollectionUtil;
-import api.common.util.StringUtil;
-import api.common.util.TimeUtil;
+import api.common.util.*;
 import com.css.simulation.resource.scheduler.feign.CommonService;
 import com.css.simulation.resource.scheduler.manager.TaskIndexManager;
 import com.css.simulation.resource.scheduler.manager.TaskManager;
@@ -17,12 +16,14 @@ import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskIndexPO;
 import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
 import com.css.simulation.resource.scheduler.pojo.to.ScoreTO;
-import com.css.simulation.resource.scheduler.util.ScoreUtil;
+import feign.Response;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -47,6 +48,16 @@ public class TaskService {
     private IndexTemplateMapper indexTemplateMapper;
     @Value("${spring.kafka.consumer.topic.manual-project}")
     private String manualProjectTopic;
+    @Value("${scheduler.manual-project.score.hostname}")
+    static String hostname;
+    @Value("${scheduler.manual-project.score.username}")
+    static String username;
+    @Value("${scheduler.manual-project.score.password}")
+    static String password;
+    @Value("${scheduler.manual-project.score.py-path}")
+    static String pyPath;
+    @Value("${scheduler.manual-project.result-path-linux}")
+    String manualProjectResultPathLinux;
 
     public void taskTick(String taskId) {
         // 刷新 redis 心跳时间
@@ -99,14 +110,26 @@ public class TaskService {
                     .filter(task1 -> sceneIdSet.contains(task1.getSceneId()))
                     .mapToDouble(task2 -> {
                         // 计算每个任务的得分
-                        ScoreTO score = ScoreUtil.score(task2.getRunResult(), ruleDetails);
-                        task2.setReturnSceneId(score.getUnitSceneID());
-                        task2.setScore(score.getUnitSceneScore());
-                        task2.setTargetEvaluate(score.getEvaluateItem());
-                        task2.setScoreExplain(score.getScoreDescription());
+                        ScoreTO score = new ScoreTO();
+                        try {
+                            String runResultMinio = task2.getRunResult();
+                            String runResultLinux = manualProjectResultPathLinux + runResultMinio.substring(1);
+                            Response download = commonService.download(MinioParameter.builder().objectName(runResultMinio).build());
+                            Response.Body body = download.body();
+                            InputStream inputStream = body.asInputStream();
+                            FileUtil.writeInputStreamToLocalFile(inputStream, runResultLinux);
+                            String executeResult = SshUtil.execute(hostname, username, password, "python3 " + pyPath + " " + runResultLinux + " " + task2.getSceneType());
+                            score = JsonUtil.jsonToBean(executeResult, ScoreTO.class);
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                        task2.setReturnSceneId(score.getUnit_scene_ID());
+                        task2.setScore(score.getUnit_scene_score());
+                        task2.setTargetEvaluate(score.getEvaluate_item());
+                        task2.setScoreExplain(score.getScore_description());
                         task2.setModifyUserId(USER_ID);
                         task2.setModifyTime(TimeUtil.getNowForMysql());
-                        return score.getUnitSceneScore();
+                        return score.getUnit_scene_score();
                     }).sum();
             long notStandardSceneNum = taskList.stream()    // 计算不合格的任务数(不到100分就是不合格)
                     .filter(task1 -> sceneIdSet.contains(task1.getSceneId()) && task1.getScore() < 100)

+ 5 - 2
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ScoreUtil.java

@@ -7,10 +7,13 @@ import com.css.simulation.resource.scheduler.pojo.to.ScoreTO;
  */
 public class ScoreUtil {
 
-    public static ScoreTO score(Object taskResult, String scoreRule) {
+
+
+
+    public static ScoreTO score(Object taskResult, String sceneType) {
+
         return new ScoreTO();
     }
 
 
-
 }