martin 2 rokov pred
rodič
commit
fdf227422b

+ 33 - 18
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -105,24 +105,24 @@ public class ProjectConsumer {
         String useType = userPO.getUseType();
         ClusterPO clusterPO;
         if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
-            log.info("ProjectConsumer--cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接执行!");
+            log.info("ProjectConsumer--cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
             PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
-            boolean run = run(DictConstants.SYSTEM_CLUSTER_ID, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
-            if (!run) {
-                wait(DictConstants.SYSTEM_CLUSTER_ID, projectId, redisPrefix.getProjectWaitingKey(), projectJson);
-            }
+            run(DictConstants.SYSTEM_CLUSTER_ID, projectId, projectType, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey(), projectJson, parallelism);
             return;
         } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
             clusterPO = clusterMapper.selectByUserId(userId);
+            log.info("ProjectConsumer--cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterPO);
         } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
             if (DictConstants.USE_TYPE_EXCLUSIVE.equals(useType)) {   //3-3 普通子账户,根据自己的独占节点排队
                 clusterPO = clusterMapper.selectByUserId(userId);
+                log.info("ProjectConsumer--cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为普通独占子账户(自己的集群),集群为:" + clusterPO);
             } else {    //3-4 共享子账户,根据父账户的共享节点排队
                 String parentUserId = userPO.getCreateUserId();
                 clusterPO = clusterMapper.selectByUserId(parentUserId);
+                log.info("ProjectConsumer--cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为普通共享子账户(父账户的集群),集群为:" + clusterPO);
             }
         } else {
-            log.error("ProjectConsumer--dispatchProject 项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
+            log.error("ProjectConsumer--cacheManualProject 项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
             return;
         }
         // 获取拥有的节点数量,即仿真软件证书数量
@@ -133,12 +133,12 @@ public class ProjectConsumer {
         Set<String> clusterRunningKeySet = stringRedisTemplate.keys(redisPrefix.getClusterRunningPrefix() + "*");
         List<String> runningProjectSet;
         if (CollectionUtil.isEmpty(clusterRunningKeySet)) {
-            run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
+            run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey(), projectJson, parallelism);
             return;
         }
         runningProjectSet = projectUtil.getRunningProjectList(clusterRunningKeySet);
         if (CollectionUtil.isEmpty(runningProjectSet)) {
-            run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
+            run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey(), projectJson, parallelism);
             return;
         }
         // 计算正在运行的项目的并行度总和
@@ -150,33 +150,48 @@ public class ProjectConsumer {
         }
         // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
         if (parallelismSum + parallelism <= simulationLicenseNumber) {
-            run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), projectJson, parallelism);
+            run(clusterId, projectId, projectType, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey(), projectJson, parallelism);
         } else {
             wait(clusterId, projectId, redisPrefix.getProjectWaitingKey(), projectJson);
         }
     }
 
-    public boolean run(String clusterId, String projectId, String projectType, String projectRunningKey, String projectJson, long parallelism) {
+    /**
+     * @param clusterId
+     * @param projectId
+     * @param projectType
+     * @param projectRunningKey
+     * @param projectJson
+     * @param parallelism
+     * @return
+     */
+    public void run(String clusterId, String projectId, String projectType, String projectRunningKey, String projectWaitingKey, String projectJson, long parallelism) {
 
         //1 获取一个剩余可用并行度最大的节点
-        KubernetesNodeTO maxParallelismPNodeTO = projectUtil.getMaxParallelismNode();
-        String maxRestParallelismNode = maxParallelismPNodeTO.getName();
-        Long maxRestParallelism = maxParallelismPNodeTO.getMaxParallelism();
+        KubernetesNodeTO maxParallelismNodeTO = projectUtil.getMaxParallelismNode();
+        String maxRestParallelismNode = maxParallelismNodeTO.getName();
+        long maxRestParallelism = maxParallelismNodeTO.getMaxParallelism();
+        log.info("ProjectConsumer--run 准备在节点 " + maxParallelismNodeTO + " 执行项目 " + projectId + "。");
 
         //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
-        if (maxRestParallelism > parallelism) {
+        if (maxRestParallelism >= parallelism) {
             log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
             parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, parallelism);
-            return true;
-        } else if (maxRestParallelism > 0) {
+        } else if (maxRestParallelism > 0L) {
             log.info("ProjectConsumer--run 集群 " + clusterId + " 将项目 " + projectId + "在节点" + maxRestParallelismNode + " 执行!");
             parseProject(projectId, projectType, projectJson, "cluster:" + clusterId, projectRunningKey, maxRestParallelismNode, maxRestParallelism);
-            return true;
         } else {
-            return false;
+            wait(clusterId, projectId, projectWaitingKey, projectJson);
+            log.info("ProjectConsumer--cacheManualProject 服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
         }
     }
 
+    /**
+     * @param clusterId
+     * @param projectId
+     * @param projectWaitingKey
+     * @param projectJson
+     */
     public void wait(String clusterId, String projectId, String projectWaitingKey, String projectJson) {
         log.info("ProjectConsumer--wait 集群 " + clusterId + " 将项目 " + projectId + " 放入等待队列!");
         stringRedisTemplate.opsForValue().set(projectWaitingKey, projectJson);

+ 9 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -439,6 +439,15 @@ public class TaskManager {
         SshUtil.execute(clientSession, "kubectl delete job project-" + projectId);
         clientSession.close();
         sshClient.stop();
+
+        // 归还并行度
+        String nodeOfProject = "project:" + projectId + ":node";
+        String restParallelismKey = "node:" + stringRedisTemplate.opsForValue().get(nodeOfProject) + ":parallelism";
+        String usedParallelismKey = "project:" + projectId + ":parallelism";
+        long restParallelism = Long.parseLong(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(restParallelismKey)));
+        long usedParallelism = Long.parseLong(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(usedParallelismKey)));
+        stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism + usedParallelism) + "");
+
     }
 
 }

+ 5 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/IndexMapper.java

@@ -73,10 +73,11 @@ public interface IndexMapper {
             "      (select scene\n" +
             "       from simulation_manual_project smp\n" +
             "       where id = #{projectId})\n" +
-            "and (scene_natural_ids like #{idExtend}\n" +
-            "    or scene_traffic_ids like #{idExtend}\n" +
-            "    or scene_statue_ids like #{idExtend}\n" +
-            "    )\n")
+            "         and (scene_natural_ids like #{idExtend}\n" +
+            "           or scene_traffic_ids like #{idExtend}\n" +
+            "           or scene_statue_ids like #{idExtend}\n" +
+            "           or scene_generalization_ids like #{idExtend}\n" +
+            "           )")
     List<String> selectLeafIndexIdByProjectAndSceneId(@Param("projectId") String projectId, @Param("idExtend") String idExtend);
 
 

+ 1 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/SceneMapper.java

@@ -117,5 +117,5 @@ public interface SceneMapper {
             "       </foreach>\n" +
             "</if>\n" +
             "</script>")
-    List<ScenePO> selectGeneralizationByIdList(List<String> generalizationIdList);
+    List<ScenePO> selectGeneralizationByIdList(@Param("idList")List<String> idList);
 }

+ 7 - 5
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -224,17 +224,19 @@ public class ProjectScheduler {
             PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectId(userId, projectId);
             String lastNowString = stringRedisTemplate.opsForValue().get(redisPrefix.getProjectCheckKey());
             // 获取该项目的 job 中正在运行的 pod 数量
-            List<String> allPodList = KubernetesUtil.getPod(apiClient, "");
-            long taskNumber = allPodList.stream().filter(podName -> podName.contains(projectId)).count();
+            List<String> allJobList = KubernetesUtil.getJob(apiClient, "default");
+            List<String> allPodList = KubernetesUtil.getPod(apiClient, "default");
+            long jobNumber = allJobList.stream().filter(jobName -> jobName.contains(projectId)).count();
+            long podNumber = allPodList.stream().filter(podName -> podName.contains(projectId)).count();
             // 如果没有检查过且 pod 列表为空,则正式开始检查,设置第一次检查时间
-            if (StringUtil.isEmpty(lastNowString) && taskNumber == 0L) {
+            if (StringUtil.isEmpty(lastNowString) && jobNumber == 1L && podNumber == 0L) {
                 log.info("ProjectScheduler--projectCheck 开始检查项目 " + projectId);
                 stringRedisTemplate.opsForValue().set(redisPrefix.getProjectCheckKey(), TimeUtil.getNowString());
                 return;
             }
-            log.info("ProjectScheduler--projectCheck kubernetes 的命名空间 default 中正在运行的 pod 有:" + allPodList + ",其中项目 " + projectId + " 的任务个数为:" + taskNumber);
+            log.info("ProjectScheduler--projectCheck kubernetes 的命名空间 default 中正在运行的 pod 有:" + allPodList + ",其中项目 " + projectId + " 的任务个数为:" + podNumber);
             //  如果两次检查时间超过了 2 分钟,且仍然没有 pod 执行,则准备重启
-            if (StringUtil.isNotEmpty(lastNowString) && taskNumber == 0L && Long.parseLong(TimeUtil.getNowString()) - Long.parseLong(lastNowString) > (long) 120 * 1000) {
+            if (StringUtil.isNotEmpty(lastNowString) && jobNumber == 1L && podNumber == 0L && Long.parseLong(TimeUtil.getNowString()) - Long.parseLong(lastNowString) > (long) 120 * 1000) {
                 // 删除检查
                 stringRedisTemplate.delete(redisPrefix.getProjectCheckKey());
                 try {

+ 34 - 20
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -77,11 +77,15 @@ public class ProjectService {
     // -------------------------------- Comment --------------------------------
 
     @Transactional
-    public void prepare(String clusterPrefix, String projectId, String projectType,String projectRunningKey,String projectJson, String nodeName, long parallelism) {
+    public void prepare(String clusterPrefix, String projectId, String projectType, String projectRunningKey, String projectJson, String nodeName, long parallelism) {
         //1 将指定 node 的并行度减少
-        String restParallelismKey = "node:" + nodeName;
+        String restParallelismKey = "node:" + nodeName + ":parallelism";
+        String usedParallelismKey = "project:" + projectId + ":parallelism";
+        String nodeOfProject = "project:" + projectId + ":node";
         long restParallelism = Long.parseLong(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(restParallelismKey)));// 剩余可用并行度
         stringRedisTemplate.opsForValue().set(restParallelismKey, (restParallelism - parallelism) + "");
+        stringRedisTemplate.opsForValue().set(usedParallelismKey, parallelism + "");
+        stringRedisTemplate.opsForValue().set(nodeOfProject, nodeName);
         //2 将 redis 中该项目旧的信息则直接删除(包括 waitingKey)
         Set<String> oldKeys = stringRedisTemplate.keys(clusterPrefix + "*");
         if (CollectionUtil.isNotEmpty(oldKeys)) {
@@ -148,8 +152,19 @@ public class ProjectService {
     }
 
 
+    /**
+     * @param projectRunningPrefix
+     * @param userId
+     * @param projectId
+     * @param maxSimulationTime
+     * @param scenePOSet
+     * @param vehiclePO
+     * @param cameraPOList
+     * @param ogtPOList
+     */
     public void sendTaskMessage(String projectRunningPrefix, String userId, String projectId, Long maxSimulationTime, Set<ScenePO> scenePOSet, VehiclePO vehiclePO, List<CameraPO> cameraPOList, List<OgtPO> ogtPOList) {
         final int[] messageNumber = {0};
+        log.info("ProjectService--sendTaskMessage 项目 " + projectId + " 获得的包括的场景信息为:" + scenePOSet);
         for (ScenePO scenePO : scenePOSet) {
             String sceneId = scenePO.getId();
             //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
@@ -234,7 +249,6 @@ public class ProjectService {
                 stringRedisTemplate.opsForValue().set(taskMessagePrefix, finalTaskJson);
                 kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
                     // 消息发送到的topic
-                    assert success != null;
                     String topic = success.getRecordMetadata().topic();
                     // 消息发送到的分区
                     int partition = success.getRecordMetadata().partition();
@@ -245,13 +259,11 @@ public class ProjectService {
                             + "分区 partition 为:" + partition + "\n"
                             + "偏移量为:" + offset + "\n"
                             + "消息体为:" + finalTaskJson);
-                }, failure -> {
-                    log.error("------- 发送消息失败:" + failure.getMessage());
-                });
+                }, failure -> log.error("------- 发送消息失败:" + failure.getMessage()));
                 messageNumber[0] = messageNumber[0] + 1;
             });
         }
-        log.info("ProjectConsumer 共发送了" + messageNumber[0] + " 条消息!");
+        log.info("ProjectService--sendTaskMessage 共发送了 " + messageNumber[0] + " 条消息!");
     }
 
 
@@ -374,19 +386,21 @@ public class ProjectService {
 
     @SneakyThrows
     public void stopProject(String projectId, String type) {
-        if (DictConstants.PROJECT_TYPE_MANUAL.equals(type)) {
-            manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql());
-        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(type)) {
-            autoSubProjectMapper.updateNowRunStateAndFinishTimeById(DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql(), projectId);
-        }
-        KubernetesUtil.deleteJob(apiClient, "default", "project-" + projectId);
-        PrefixTO redisPrefix = projectUtil.getRedisPrefixByProjectIdAndProjectType(projectId, type);
-        Set<String> keys = stringRedisTemplate.keys(redisPrefix.getProjectRunningKey() + "*");
-        if (CollectionUtil.isNotEmpty(keys)) {
-            for (String key : keys) {
-                stringRedisTemplate.delete(key);
-            }
-        }
+
+        // 需要判断项目在执行还是在等待
+//        if (DictConstants.PROJECT_TYPE_MANUAL.equals(type)) {
+//            manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql());
+//        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(type)) {
+//            autoSubProjectMapper.updateNowRunStateAndFinishTimeById(DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql(), projectId);
+//        }
+//        KubernetesUtil.deleteJob(apiClient, "default", "project-" + projectId);
+//        PrefixTO redisPrefix = projectUtil.getRedisPrefixByProjectIdAndProjectType(projectId, type);
+//        Set<String> keys = stringRedisTemplate.keys(redisPrefix.getProjectRunningKey() + "*");
+//        if (CollectionUtil.isNotEmpty(keys)) {
+//            for (String key : keys) {
+//                stringRedisTemplate.delete(key);
+//            }
+//        }
     }
 
 

+ 4 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -46,12 +46,13 @@ public class ProjectUtil {
 
     public KubernetesNodeTO getMaxParallelismNode() {
         List<KubernetesNodeTO> nodeList = kubernetesConfiguration.getNodeList();
+        log.info("ProjectUtil--getMaxParallelismNode kubernetes 节点列表为:" + nodeList);
         String maxRestParallelismNode = "master";
         long maxRestParallelism = 0L;
         for (KubernetesNodeTO kubernetesNodeTO : nodeList) {
             String name = kubernetesNodeTO.getName();
-            Long maxParallelism = kubernetesNodeTO.getMaxParallelism();
-            String restParallelismKey = "node:" + name;
+            long maxParallelism = kubernetesNodeTO.getMaxParallelism();
+            String restParallelismKey = "node:" + name + ":parallelism";
             String restParallelismString = stringRedisTemplate.opsForValue().get(restParallelismKey);
             long restParallelism;
             if (restParallelismString == null) {    // 如果剩余可用并行度没有值,说明是第一次查询,则重置成最大剩余可用并行度
@@ -60,7 +61,7 @@ public class ProjectUtil {
             } else {
                 restParallelism = Long.parseLong(restParallelismString);
             }
-            if (restParallelism > maxParallelism) {
+            if (restParallelism >= maxRestParallelism) {
                 maxRestParallelism = restParallelism;
                 maxRestParallelismNode = name;
             }

+ 2 - 2
simulation-resource-scheduler/src/main/resources/bootstrap-dev.yaml

@@ -2,9 +2,9 @@ spring:
   cloud:
     nacos:
       discovery:
-        server-addr: 47.94.105.148:8848
+        server-addr: 47.93.135.21:8848
         namespace: 3698bfc2-a612-487a-b2a2-aaad16cd9d9d
       config:
-        server-addr: 47.94.105.148:8848
+        server-addr: 47.93.135.21:8848
         namespace: 3698bfc2-a612-487a-b2a2-aaad16cd9d9d
         file-extension: yaml

+ 1 - 1
simulation-resource-scheduler/src/main/resources/bootstrap.yaml

@@ -6,4 +6,4 @@ spring:
   application:
     name: simulation-resource-scheduler
   profiles:
-    active: test
+    active: dev