LingxinMeng 2 лет назад
Родитель
Сommit
32bd93728d

+ 10 - 5
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -313,23 +313,28 @@ public class ProjectConsumer {
         String roleCode = userEntity.getRoleCode();
         String useType = userEntity.getUseType();
         ClusterEntity clusterEntity;
-        String clusterUserId = projectUserId;  // 项目实际运行使用的用户集群
+        String clusterUserId;  // 项目实际运行使用的用户集群
         if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
+            clusterUserId = DictConstants.SYSTEM_CLUSTER_ID;
             log.info("项目 " + projectId + " 的创建人 " + projectUserId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
             PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
-            run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, modelType, DictConstants.SYSTEM_USER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
+            run(projectMessageDTO, clusterUserId, modelType, DictConstants.SYSTEM_USER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
             return;
         } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
+            clusterUserId = projectUserId;
             clusterEntity = clusterMapper.selectByUserId(clusterUserId);
             log.info("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterEntity);
         } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
             if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) {   //3-3 普通子账户,根据自己的独占节点排队
+                clusterUserId = projectUserId;
                 clusterEntity = clusterMapper.selectByUserId(clusterUserId);
                 log.info("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通独占子账户(自己的集群),集群为:" + clusterEntity);
-            } else {    //3-4 共享子账户,根据父账户的共享节点排队
+            } else if (DictConstants.USER_TYPE_PUBLIC.equals(useType)) {    //3-4 共享子账户,根据父账户的共享节点排队
                 clusterUserId = userEntity.getCreateUserId();
                 clusterEntity = clusterMapper.selectByUserId(clusterUserId);
                 log.info("项目 " + projectId + " 的创建人 " + projectUserId + " 为普通共享子账户(父账户的集群),集群为:" + clusterEntity);
+            } else {
+                throw new RuntimeException("用户" + projectUserId + "未知占用类型:" + useType);
             }
         } else {
             throw new RuntimeException("未知角色类型:" + roleCode);
@@ -341,7 +346,7 @@ public class ProjectConsumer {
 //            int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
             // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
             if (projectUtil.getUsingLicenseNumber(clusterUserId, DictConstants.MODEL_TYPE_VTD) + parallelism <= clusterEntity.getNumSimulationLicense()) {
-                run(projectMessageDTO, projectUserId, modelType, clusterEntity.getId(), redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
+                run(projectMessageDTO, clusterUserId, modelType, clusterEntity.getId(), redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
             } else {
                 log.info("VTD 项目 " + projectId + " 并行度超出账户允许,加入等待队列,暂不执行。 ");
                 wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
@@ -352,7 +357,7 @@ public class ProjectConsumer {
 //                int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
             // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
             if (projectUtil.getUsingLicenseNumber(clusterUserId, DictConstants.MODEL_TYPE_VTD) + parallelism <= clusterEntity.getNumSimulationLicense() && projectUtil.getUsingLicenseNumber(clusterUserId, DictConstants.MODEL_TYPE_CARSIM) + parallelism <= clusterEntity.getNumDynamicLicense()) {
-                run(projectMessageDTO,clusterUserId, modelType, clusterEntity.getId(), redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
+                run(projectMessageDTO, clusterUserId, modelType, clusterEntity.getId(), redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
             } else {
                 log.info("CARSIM 项目 " + projectId + " 并行度超出账户允许,加入等待队列,暂不执行。 ");
                 wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);

+ 16 - 21
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -1,17 +1,12 @@
 package com.css.simulation.resource.scheduler.scheduler;
 
-import api.common.util.CollectionUtil;
-import com.css.simulation.resource.scheduler.util.ProjectUtil;
-import com.css.simulation.resource.scheduler.util.RedisUtil;
 import com.css.simulation.resource.scheduler.consumer.ProjectConsumer;
+import com.css.simulation.resource.scheduler.util.ProjectUtil;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.data.redis.core.StringRedisTemplate;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
-import java.util.List;
 
 @Component
 @Slf4j
@@ -23,20 +18,20 @@ public class ProjectScheduler {
     @Resource
     private ProjectUtil projectUtil;
 
-    /**
-     * 调度项目启动
-     */
-    @Scheduled(fixedDelay = 1000)
-    public void dispatchProject() {
-        List<String> projectMessageKeys = projectUtil.getWaitingProjectMessageKeys();
-        if (!CollectionUtil.isEmpty(projectMessageKeys)) {
-            log.info("尝试启动等待中的项目:" + projectMessageKeys);
-            for (String projectMessageKey : projectMessageKeys) {
-                final String projectMessage = RedisUtil.getStringByKey(stringRedisTemplate, projectMessageKey);
-                ConsumerRecord<String, String> projectRecord = new ConsumerRecord<>("", 0, 0L, null, projectMessage);
-                projectConsumer.acceptMessage(projectRecord);
-            }
-        }
-    }
+//    /**
+//     * 调度项目启动
+//     */
+//    @Scheduled(fixedDelay = 1000)
+//    public void dispatchProject() {
+//        List<String> projectMessageKeys = projectUtil.getWaitingProjectMessageKeys();
+//        if (!CollectionUtil.isEmpty(projectMessageKeys)) {
+//            log.info("尝试启动等待中的项目:" + projectMessageKeys);
+//            for (String projectMessageKey : projectMessageKeys) {
+//                final String projectMessage = RedisUtil.getStringByKey(stringRedisTemplate, projectMessageKey);
+//                ConsumerRecord<String, String> projectRecord = new ConsumerRecord<>("", 0, 0L, null, projectMessage);
+//                projectConsumer.acceptMessage(projectRecord);
+//            }
+//        }
+//    }
 
 }

+ 2 - 43
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -5,7 +5,6 @@ import api.common.pojo.constants.DictConstants;
 import api.common.pojo.dto.ProjectMessageDTO;
 import api.common.util.*;
 import com.css.simulation.resource.scheduler.configuration.custom.CustomConfiguration;
-import com.css.simulation.resource.scheduler.configuration.feign.WebServerClient;
 import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
 import com.css.simulation.resource.scheduler.configuration.redis.CustomRedisClient;
 import com.css.simulation.resource.scheduler.data.entity.NodeEntity;
@@ -63,8 +62,6 @@ public class ProjectUtil {
     @Resource
     private CustomRedisClient customRedisClient;
     @Resource
-    private WebServerClient webServerClient;
-    @Resource
     private CustomConfiguration customConfiguration;
     @Resource
     private CloseableHttpClient closeableHttpClient;
@@ -222,34 +219,6 @@ public class ProjectUtil {
         throw new RuntimeException("不存在项目:" + projectId);
     }
 
-
-//    /**
-//     * 获取正在运行的项目的并行度总和
-//     *
-//     * @param clusterRunningPrefix 集群 key 前缀
-//     * @return 正在运行的项目的并行度总和
-//     */
-//    @SneakyThrows
-//    public int getCurrentParallelismSum(String clusterRunningPrefix) {
-//        int result = 0;
-//        Set<String> clusterRunningKeySet = stringRedisTemplate.keys(clusterRunningPrefix + "*");
-//        List<String> runningProjectSet; // 运行中的 projectId 列表
-//        if (CollectionUtil.isEmpty(clusterRunningKeySet)) {
-//            return 0;
-//        }
-//        runningProjectSet = getRunningProjectList(clusterRunningKeySet);
-//        if (CollectionUtil.isEmpty(runningProjectSet)) {
-//            return 0;
-//        }
-//        for (String projectKey : runningProjectSet) {
-//            String projectJsonTemp = stringRedisTemplate.opsForValue().get(projectKey);
-//            ProjectMessageDTO projectMessageTemp = JsonUtil.jsonToBean(projectJsonTemp, ProjectMessageDTO.class);
-//            result += projectMessageTemp.getCurrentParallelism();   // 获取当前正在使用的并行度
-//        }
-//        return result;
-//    }
-
-
     /**
      * 节点剩余可用并行度列表
      *
@@ -518,16 +487,6 @@ public class ProjectUtil {
     }
 
 
-//    /**
-//     * 获取 projectId 列表
-//     *
-//     * @param clusterRunningKeySet 集群下的所有键值对(包括运行中的项目和等待中的项目)
-//     * @return projectId 列表
-//     */
-//    public List<String> getRunningProjectList(Set<String> clusterRunningKeySet) {
-//        return clusterRunningKeySet.stream().filter(key -> StringUtil.countSubString(key, ":") == 3).collect(Collectors.toList());
-//    }
-
 
     public PrefixEntity getRedisPrefixByProjectIdAndProjectType(String projectId, String projectType) {
         String userId;
@@ -798,7 +757,7 @@ public class ProjectUtil {
         params.put("id", projectId);
         String result = HttpUtil.post(closeableHttpClient, requestConfig, customConfiguration.getProjectDetailsUri(), headers, params);
         log.info("访问仿真云平台项目详情接口:" + customConfiguration.getProjectDetailsUri() + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + result);
-        ResponseBodyVO responseBodyVO = JsonUtil.jsonToBean(result, ResponseBodyVO.class);
+        ResponseBodyVO<?> responseBodyVO = JsonUtil.jsonToBean(result, ResponseBodyVO.class);
         String projectDetailsVOJson = JsonUtil.beanToJson(responseBodyVO.getInfo());
         TimeUnit.SECONDS.sleep(5);
         if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
@@ -827,7 +786,7 @@ public class ProjectUtil {
         params.put("id", projectId);
         String result = HttpUtil.post(closeableHttpClient, requestConfig, customConfiguration.getProjectReportUri(), headers, params);
         log.info("访问仿真云平台项目报告接口:" + customConfiguration.getProjectReportUri() + ",请求头为:" + headers + ",请求体为:" + params + "结果为:" + result);
-        ResponseBodyVO responseBodyVO = JsonUtil.jsonToBean(result, ResponseBodyVO.class);
+        ResponseBodyVO<?> responseBodyVO = JsonUtil.jsonToBean(result, ResponseBodyVO.class);
         String projectReportVOJson = JsonUtil.beanToJson(responseBodyVO.getInfo());
         TimeUnit.SECONDS.sleep(5);
         if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {