martin 3 vuotta sitten
vanhempi
commit
80b19714db

+ 40 - 17
api-common/src/main/java/api/common/util/SshUtil.java

@@ -30,20 +30,26 @@ import java.util.*;
 public class SshUtil {
 
     private static final int port = 22;
-    private static final SshClient client = SshClient.setUpDefaultClient();
 
 
     /**
-     * @param hostname 主机名或 IP
-     * @param username 用户名
-     * @param password 密码
+     *
+     */
+    public static SshClient getClient() {
+        return SshClient.setUpDefaultClient();
+    }
+
+    /**
+     * @param sshClient ssh 客户端对象
+     * @param hostname  主机名或 IP
+     * @param username  用户名
+     * @param password  密码
      * @return 执行结果
      * @throws IOException 异常
      */
-    public static ClientSession getSession(String hostname, String username, String password) throws IOException {
-
-        client.start();
-        ClientSession session = client.connect(username, hostname, port).verify(10000).getSession();
+    public static ClientSession getSession(SshClient sshClient, String hostname, String username, String password) throws IOException {
+        sshClient.start();
+        ClientSession session = sshClient.connect(username, hostname, port).verify(10000).getSession();
         session.addPasswordIdentity(password);
         AuthFuture verify = session.auth().verify(10000);
         if (verify.isFailure()) {
@@ -56,9 +62,9 @@ public class SshUtil {
      * @param clientSession 主机名或 IP
      * @throws IOException 异常
      */
-    public static void stop(ClientSession clientSession) throws IOException {
+    public static void stop(SshClient sshClient, ClientSession clientSession) throws IOException {
         clientSession.close();
-        client.stop();
+        sshClient.stop();
     }
 
 
@@ -336,20 +342,37 @@ public class SshUtil {
     }
 
     /**
-     * 剩余可用磁盘
+     * 剩余可用磁盘(kb)
      *
      * @param session 会话
-     * @return 剩余可用磁盘
+     * @return 剩余可用磁盘(kb)
      */
     public static double diskAvailable(ClientSession session) throws IOException {
         String execute = execute(session, "df --total");
         String[] split = execute.split("\n");
         String[] split1 = split[split.length - 1].split("\\s+");
-        long totalSize = Long.parseLong(split1[1]);
-        long totalUsed = Long.parseLong(split1[2]);
-        long totalAvailable = Long.parseLong(split1[3]);
-        String totalUsedPercent = split1[4];
-        return totalAvailable;
+        long total = Long.parseLong(split1[1]);
+        long used = Long.parseLong(split1[2]);
+        long available = Long.parseLong(split1[3]);
+        String usedPercent = split1[4];
+        return available;
+    }
+
+    /**
+     * 总磁盘(kb)
+     *
+     * @param session 会话
+     * @return 总磁盘(kb)
+     */
+    public static double diskTotal(ClientSession session) throws IOException {
+        String execute = execute(session, "df --total");
+        String[] split = execute.split("\n");
+        String[] split1 = split[split.length - 1].split("\\s+");
+        long total = Long.parseLong(split1[1]);
+        long used = Long.parseLong(split1[2]);
+        long available = Long.parseLong(split1[3]);
+        String usedPercent = split1[4];
+        return total;
     }
 
 }

+ 4 - 2
simulation-resource-monitor/src/main/java/com/css/simulation/resource/monitor/scheduler/MyScheduler.java

@@ -5,6 +5,7 @@ import api.common.pojo.po.home.SystemServerPO;
 import api.common.util.SshUtil;
 import api.common.util.StringUtil;
 import lombok.Data;
+import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -23,7 +24,7 @@ public class MyScheduler {
     public void server() {
 
 
-        hostList.forEach(host ->{
+        hostList.forEach(host -> {
 
             String ip = host.getHostname();
             Integer port = host.getPort();
@@ -32,7 +33,8 @@ public class MyScheduler {
             String type = host.getType();
 
             try {
-                ClientSession session = SshUtil.getSession(ip, username, password);
+                SshClient sshClient = SshUtil.getClient();
+                ClientSession session = SshUtil.getSession(sshClient, ip, username, password);
 //                if ("gpu".equals(type)){
 //                    List<GpuDTO> gpuList = SshUtil.gpuInfo(session);
 //                    gpuList.forEach(gpu -> {

+ 8 - 13
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ManualProjectConsumer.java

@@ -219,8 +219,7 @@ public class ManualProjectConsumer {
             //4-4 将对象转成 json
             String taskJson = JsonUtil.beanToJson(taskTO);
             //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
-//            kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
-            kafkaTemplate.send("test", taskJson).addCallback(success -> {
+            kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
                 // 消息发送到的topic
                 String topic = success.getRecordMetadata().topic();
                 // 消息发送到的分区
@@ -315,23 +314,19 @@ public class ManualProjectConsumer {
         log.info("------- ManualProjectConsumer 项目 " + projectId + " 的完成度为:" + completions);
         log.info("------- ManualProjectConsumer 项目 " + projectId + " 的并行度为:" + parallelism);
         String jobTemplateYamlPathSource = "/opt/simulation-cloud/simulation-resource-scheduler/job-template/job-template.yaml";
-//        String jobTemplateYamlPathSource = "D:\\temp\\job-template.yaml";
         String jobTemplateYamlPathTarget = "/opt/simulation-cloud/simulation-resource-scheduler/job-yaml/" + "project-" + projectId + ".yaml";
         String yamlSource = FileUtil.read(jobTemplateYamlPathSource);
         log.info("------- ManualProjectConsumer 模板文件为:" + yamlSource);
         String replace0 = yamlSource.replace("job-cloud-simulation", "project-" + projectId);
         String replace1 = replace0.replace("vtd-container", "vtd-" + projectId);
         String replace2 = replace1.replace("algorithm-container", "algorithm-" + projectId);
-        int i = replace2.indexOf("completions:");
-        int j = replace2.indexOf("parallelism:");
-        StringBuilder stringBuilder = new StringBuilder(replace2);
-        stringBuilder.replace(i + "completions: ".length(), i + "completions: ".length() + 1, completions + "");
-        stringBuilder.replace(j + "parallelism: ".length(), j + "parallelism: ".length() + 1, parallelism + "");
-        String yamlTarget0 = stringBuilder.toString();
-        String yamlTarget1 = yamlTarget0.replace("apiVers1on", "apiVersion");
-        String yamlTarget2 = yamlTarget1.replace("1atch/v1", "batch/v1");
-        log.info("------- ManualProjectConsumer 开始执行 yaml 文件" + yamlTarget1);
-        FileUtil.writeStringToLocalFile(yamlTarget2, jobTemplateYamlPathTarget);
+        String replace3 = replace2.replace("projectId", projectId);
+        String replace4 = replace3.replace("completions-number", completions + "");
+        String replace5 = replace4.replace("parallelism-number", parallelism + "");
+        String replace6 = replace5.replace("apiVers1on", "apiVersion");
+        String replace7 = replace6.replace("1atch/v1", "batch/v1");
+        log.info("------- ManualProjectConsumer 开始执行 yaml 文件" + replace7);
+        FileUtil.writeStringToLocalFile(replace7, jobTemplateYamlPathTarget);
         LinuxUtil.execute("kubectl apply -f " + jobTemplateYamlPathTarget);
     }
 

+ 3 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/IndexTemplateMapper.java

@@ -22,6 +22,7 @@ public interface IndexTemplateMapper {
             @Result(column = "scene_statue_ids", property = "sceneStatueIds", jdbcType = JdbcType.VARCHAR),
             @Result(column = "weight", property = "weight", jdbcType = JdbcType.VARCHAR),
             @Result(column = "parent_id", property = "parentId", jdbcType = JdbcType.VARCHAR),
+            @Result(column = "root_id", property = "parentId", jdbcType = JdbcType.VARCHAR),
             @Result(column = "rule_name", property = "ruleName", jdbcType = JdbcType.VARCHAR),
             @Result(column = "rule_details", property = "ruleDetails", jdbcType = JdbcType.VARCHAR)
     })
@@ -58,7 +59,8 @@ public interface IndexTemplateMapper {
             "       scene_traffic_ids,\n" +
             "       scene_statue_ids,\n" +
             "       weight,\n" +
-            "       parent_id\n" +
+            "       parent_id,\n" +
+            "       root_id\n" +
             "from scene_package_sublist\n" +
             "where 1=1\n" +
             "<if test='idList != null and idList.size() > 0'>\n" +

+ 8 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/TaskIndexMapper.java

@@ -2,6 +2,7 @@ package com.css.simulation.resource.scheduler.mapper;
 
 
 import com.css.simulation.resource.scheduler.pojo.po.TaskIndexPO;
+import org.apache.ibatis.annotations.Delete;
 import org.apache.ibatis.annotations.Insert;
 import org.apache.ibatis.annotations.Mapper;
 import org.apache.ibatis.annotations.Param;
@@ -56,4 +57,11 @@ public interface TaskIndexMapper {
             "        #{total.modifyUserId},\n" +
             "        #{total.isDeleted})")
     void insertTotalIndex(@Param("total")TaskIndexPO totalTaskIndex);
+
+
+    @Delete("delete from simulation_mpt_first_target_score where p_id = #{project}")
+    void deleteFirstByProjectId(@Param("projectId")String projectId);
+
+    @Delete("delete from simulation_mpt_last_target_score where p_id = #{project}")
+    void deleteLastByProjectId(@Param("projectId")String projectId);
 }

+ 2 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/pojo/po/IndexTemplatePO.java

@@ -16,7 +16,8 @@ public class IndexTemplatePO {
     private String sceneTrafficIds;
     private String sceneStatueIds;
     private String weight;  // 权重
-    private String parentId;  // 父 id
+    private String parentId;  // 父指标 id
+    private String rootId;  // 场景测试包 id
     private String ruleName; // 打分规则名称,例如 AEB_1-1
     private String ruleDetails; // 打分规则代码
     private Double tempScore;

+ 45 - 34
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -22,6 +22,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -124,17 +125,21 @@ public class TaskService {
         }
         projectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED);   // 修改该 project 的状态为已完成
         LinuxUtil.execute("kubectl delete job project-" + projectId);
-        ClientSession sessionKafka = SshUtil.getSession(hostnameKafka, usernameKafka, passwordKafka);
-//        String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand,"topicName",projectId);
-        String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand, "topicName", "test");
+        SshClient clientKafka = SshUtil.getClient();
+        ClientSession sessionKafka = SshUtil.getSession(clientKafka, hostnameKafka, usernameKafka, passwordKafka);
+        String topicDeleteCommand = StringUtil.replace(kafkaDeleteCommand, "topicName", projectId);
         SshUtil.execute(sessionKafka, topicDeleteCommand);
+        SshUtil.stop(clientKafka, sessionKafka);
         List<TaskPO> taskList = taskMapper.selectTaskListByProjectId(projectId);  // 所有任务信息
+        taskIndexMapper.deleteFirstByProjectId(projectId);
+        taskIndexMapper.deleteLastByProjectId(projectId);
         // -------------------------------- 查询叶子指标 --------------------------------
         List<IndexTemplatePO> leafIndexTemplateList = indexTemplateMapper.selectLeafIndexWithRuleDetailsByPackageId(scenePackageId);
         List<TaskIndexPO> leafTaskIndexList = new ArrayList<>();
         log.info("------- /state 共有 " + leafIndexTemplateList.size() + "个叶子节点!");
 
-        ClientSession sessionScore = SshUtil.getSession(hostnameScore, usernameScore, passwordScore);
+        SshClient clientScore = SshUtil.getClient();
+        ClientSession sessionScore = SshUtil.getSession(clientScore, hostnameScore, usernameScore, passwordScore);
         for (int i = 0; i < leafIndexTemplateList.size(); i++) {
             IndexTemplatePO indexTemplatePO = leafIndexTemplateList.get(i);
             String indexId = indexTemplatePO.getIndexId();
@@ -234,29 +239,14 @@ public class TaskService {
             leafTaskIndex.setIsDeleted("0");
             leafTaskIndexList.add(leafTaskIndex);
         }
-        SshUtil.stop(sessionScore);
-
-        log.info("------- /state 根据每个指标的得分和权重算出 project 的总得分:" + leafIndexTemplateList);
-        double totalScore = compute(leafIndexTemplateList);
-        log.info("------- /state 总得分为:" + totalScore);
-        // 保存分数
+        SshUtil.stop(clientScore, sessionScore);
         // 保存任务分数
         taskManager.batchUpdateByScoreResult(taskList);
-        // 保存指标分数
+        // 保存末级指标分数
         taskIndexManager.batchInsertLeafIndex(leafTaskIndexList);
-        // 保存总分数
-        TaskIndexPO totalTaskIndex = TaskIndexPO.builder()
-                .id(StringUtil.getRandomUUID())
-                .pId(projectId)
-                .target(scenePackageId)
-                .score(totalScore)
-                .build();
-        totalTaskIndex.setCreateUserId(USER_ID);
-        totalTaskIndex.setCreateTime(TimeUtil.getNowForMysql());
-        totalTaskIndex.setModifyUserId(USER_ID);
-        totalTaskIndex.setModifyTime(TimeUtil.getNowForMysql());
-        totalTaskIndex.setIsDeleted("0");
-        taskIndexMapper.insertTotalIndex(totalTaskIndex);
+        // 保存一级指标分数
+        log.info("------- /state 根据每个指标的得分和权重算出各个一级指标的得分(即 project 对应的场景测试包下的一级指标)!");
+        computeFirst(leafIndexTemplateList, projectId);
 
         // 调用 server 的接口,计算评价等级
         String tokenUrl = tokenUri + "?grant_type=client_credentials"
@@ -285,19 +275,40 @@ public class TaskService {
     }
 
 
-    public double compute(List<IndexTemplatePO> leaf) {
-        double result = 0.0;
-        Map<String, List<IndexTemplatePO>> groups = leaf.stream().collect(Collectors.groupingBy(IndexTemplatePO::getParentId));
-        Set<String> parentIdSet = groups.keySet();
+    public void computeFirst(List<IndexTemplatePO> leaf, String projectId) {
+        Map<String, List<IndexTemplatePO>> parentIndexMap = leaf.stream().collect(Collectors.groupingBy(IndexTemplatePO::getParentId));
+        Set<String> parentIdSet = parentIndexMap.keySet();
+        List<String> parentIdList = CollectionUtil.setToList(parentIdSet);
         log.info("------- /state 将叶子指标按父指标分组父指标集合为:" + parentIdSet);
-        List<IndexTemplatePO> indexTemplatePOList = indexTemplateMapper.selectByIdList(CollectionUtil.setToList(parentIdSet));
-        indexTemplatePOList.forEach(index1 -> {
-            double sum = groups.get(index1.getIndexId()).stream().mapToDouble(index2 -> index2.getTempScore() * Double.parseDouble(index2.getWeight())).sum();
+        List<IndexTemplatePO> parentIndexList = indexTemplateMapper.selectByIdList(parentIdList);
+        // 计算父指标得分
+        parentIndexList.forEach(index1 -> {
+            double sum = parentIndexMap.get(index1.getIndexId()).stream().mapToDouble(index2 -> index2.getTempScore() * Double.parseDouble(index2.getWeight())).sum();
             index1.setTempScore(sum);
         });
-        if (indexTemplatePOList.size() > 1) {
-            result = result + compute(indexTemplatePOList);
+        Iterator<IndexTemplatePO> parentIndexIterator = parentIndexList.iterator();
+        while (parentIndexIterator.hasNext()) {
+            IndexTemplatePO parentIndex = parentIndexIterator.next();
+            String parentId = parentIndex.getParentId();
+            String rootId = parentIndex.getRootId();
+            if (parentId.equals(rootId)) {
+                TaskIndexPO totalTaskIndex = TaskIndexPO.builder()
+                        .id(StringUtil.getRandomUUID())
+                        .pId(projectId)
+                        .target(parentIndex.getIndexId())
+                        .score(parentIndex.getTempScore())
+                        .build();
+                totalTaskIndex.setCreateUserId(USER_ID);
+                totalTaskIndex.setCreateTime(TimeUtil.getNowForMysql());
+                totalTaskIndex.setModifyUserId(USER_ID);
+                totalTaskIndex.setModifyTime(TimeUtil.getNowForMysql());
+                totalTaskIndex.setIsDeleted("0");
+                taskIndexMapper.insertTotalIndex(totalTaskIndex);
+                parentIndexIterator.remove();
+            }
+        }
+        if (parentIndexList.size() > 1) {
+            computeFirst(parentIndexList, projectId);
         }
-        return result;
     }
 }

+ 3 - 1
simulation-resource-scheduler/src/main/resources/kubernetes/template/job/job-template.yaml

@@ -6,6 +6,8 @@ metadata:
   labels:
     user: EY
 spec:
+  completions: completions-number
+  parallelism: parallelism-number
   template:
     metadata:
       name: pod-cloud-simulation
@@ -18,7 +20,7 @@ spec:
         - name: vtd-container
           image: vtd.run.perception:latest
           imagePullPolicy: Never
-          command: [ "/Controller/VTDController", "/Controller/config/docker_cloud.ini" ]
+          command: [ "/Controller/VTDController", "/Controller/config/docker_cloud.ini", "projectId"]
           env:
             - name: PodName
               valueFrom: