martin 2 years ago
parent
commit
dc77fdb7c6

+ 24 - 18
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -11,6 +11,8 @@ import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.service.ProjectService;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -221,24 +223,28 @@ public class ProjectConsumer {
 
     }
 
-//    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")
-//    @SneakyThrows
-//    public void stopManualProject(ConsumerRecord<String, String> stopRecord) {
-//        log.info("ProjectConsumer--stopManualProject 接收到的项目终止消息为:" + stopRecord);
-//        //1 读取 kafka 的项目停止信息
-//        /*
-//            {
-//                "projectId": "sadfasdfs",	// 项目 id
-//            }
-//         */
-//        String json = stopRecord.value();
-//        ObjectMapper objectMapper = new ObjectMapper();
-//        JsonNode jsonNode = objectMapper.readTree(json);
-//        String projectId = jsonNode.path("projectId").asText();
-//        manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql());
-//        LinuxUtil.execute("kubectl delete job project-" + projectId);
-//        redisTemplate.delete(manualProjectTopic + ":" + projectId + ":check");
-//    }
+    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")
+    @SneakyThrows
+    public void stopProject(ConsumerRecord<String, String> stopRecord) {
+        log.info("ProjectConsumer--stopManualProject 接收到的项目终止消息为:" + stopRecord);
+        //1 读取 kafka 的项目停止信息
+        /*
+            {
+                "projectId": "sadfasdfs",	// 项目 id
+                "type": "1",	// 项目 id
+            }
+         */
+
+        String json = stopRecord.value();
+        ObjectMapper objectMapper = new ObjectMapper();
+        JsonNode jsonNode = objectMapper.readTree(json);
+        String projectId = jsonNode.path("projectId").asText();
+        String type = jsonNode.path("type").asText();
+        projectService.stopProject(projectId,type);
+
+
+
+    }
 
 
 }

+ 8 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/mapper/AutoSubProjectMapper.java

@@ -6,6 +6,8 @@ import org.apache.ibatis.annotations.Param;
 import org.apache.ibatis.annotations.Select;
 import org.apache.ibatis.annotations.Update;
 
+import java.sql.Timestamp;
+
 @Mapper
 public interface AutoSubProjectMapper {
 
@@ -25,4 +27,10 @@ public interface AutoSubProjectMapper {
             "set now_run_state = #{nowRunState}\n" +
             "where id = #{id}")
     void updateNowRunStateById(@Param("nowRunState") String nowRunState, @Param("id") String id);
+
+    @Update("update simulation_automatic_subproject\n" +
+            "set now_run_state  = #{nowRunState},\n" +
+            "    finish_time = #{finishTime}\n" +
+            "where id = #{id}")
+    void updateNowRunStateAndFinishTimeById(@Param("nowRunState") String state, @Param("finishTime") Timestamp finishTime, @Param("id") String id);
 }

+ 28 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -7,9 +7,11 @@ import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.pojo.to.*;
 import com.css.simulation.resource.scheduler.util.KubernetesUtil;
 import com.css.simulation.resource.scheduler.util.MinioUtil;
+import com.css.simulation.resource.scheduler.util.ProjectUtil;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.kubernetes.client.openapi.ApiClient;
 import io.minio.MinioClient;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -71,6 +73,10 @@ public class ProjectService {
     SceneMapper sceneMapper;
     @Autowired
     AlgorithmMapper algorithmMapper;
+    @Autowired
+    ApiClient apiClient;
+    @Autowired
+    ProjectUtil projectUtil;
 
     // -------------------------------- Comment --------------------------------
 
@@ -320,11 +326,12 @@ public class ProjectService {
 
     /**
      * 运行
+     *
      * @param jobTemplateYamlPathSource 模板文件
-     * @param projectId 项目id
-     * @param algorithmDockerImage 算法镜像
-     * @param completions 完成度
-     * @param parallelism 并行度
+     * @param projectId                 项目id
+     * @param algorithmDockerImage      算法镜像
+     * @param completions               完成度
+     * @param parallelism               并行度
      * @param jobTemplateYamlPathTarget 执行文件
      */
     @SneakyThrows
@@ -347,4 +354,21 @@ public class ProjectService {
         //  启动
         KubernetesUtil.applyYaml(hostname, username, password, jobTemplateYamlPathTarget);
     }
+
+    @SneakyThrows
+    public void stopProject(String projectId, String type) {
+        if (DictConstants.PROJECT_TYPE_MANUAL.equals(type)) {
+            manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql());
+        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(type)) {
+            autoSubProjectMapper.updateNowRunStateAndFinishTimeById(DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql(), projectId);
+        }
+        KubernetesUtil.deleteJob(apiClient, "default", "project-" + projectId);
+        PrefixTO redisPrefix = projectUtil.getRedisPrefixByProjectIdAndProjectType(projectId, type);
+        Set<String> keys = stringRedisTemplate.keys(redisPrefix.getProjectRunningKey() + "*");
+        if (CollectionUtil.isNotEmpty(keys)) {
+            for (String key : keys) {
+                stringRedisTemplate.delete(key);
+            }
+        }
+    }
 }

+ 51 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/util/ProjectUtil.java

@@ -2,7 +2,9 @@ package com.css.simulation.resource.scheduler.util;
 
 import api.common.pojo.constants.DictConstants;
 import api.common.util.StringUtil;
+import com.css.simulation.resource.scheduler.mapper.AutoSubProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.ClusterMapper;
+import com.css.simulation.resource.scheduler.mapper.ManualProjectMapper;
 import com.css.simulation.resource.scheduler.mapper.UserMapper;
 import com.css.simulation.resource.scheduler.pojo.po.UserPO;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
@@ -22,6 +24,10 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ProjectUtil {
 
+    @Autowired
+    ManualProjectMapper manualProjectMapper;
+    @Autowired
+    AutoSubProjectMapper autoSubProjectMapper;
     @Autowired
     UserMapper userMapper;
     @Autowired
@@ -106,7 +112,6 @@ public class ProjectUtil {
                 .projectWaitingKey(projectWaitingKey)
                 .projectCheckKey(projectCheckKey)
                 .build();
-
     }
 
 
@@ -139,4 +144,49 @@ public class ProjectUtil {
     }
 
 
+    public PrefixTO getRedisPrefixByProjectIdAndProjectType(String projectId, String projectType) {
+        String userId;
+        if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
+            userId = manualProjectMapper.selectCreateUserById(projectId);
+        } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
+            userId = autoSubProjectMapper.selectCreateUserById(projectId);
+        } else {
+            throw new RuntimeException("PrjectUtil--getRedisPrefixByProjectIdAndProjectType 未知的项目类型!");
+        }
+
+        //3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
+        UserPO userPO = userMapper.selectById(userId);
+        String roleCode = userPO.getRoleCode();
+        String useType = userPO.getUseType();
+        String clusterId;
+        if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) {  //3-1 管理员账户和管理员子账户直接执行
+            clusterId = DictConstants.SYSTEM_CLUSTER_ID;
+        } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
+            clusterId = clusterMapper.selectByUserId(userId).getId();
+        } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
+            if (DictConstants.USE_TYPE_EXCLUSIVE.equals(useType)) {   //3-3 普通子账户,根据自己的独占节点排队
+                clusterId = clusterMapper.selectByUserId(userId).getId();
+            } else {    //3-4 共享子账户,根据父账户的共享节点排队
+                String parentUserId = userPO.getCreateUserId();
+                clusterId = clusterMapper.selectByUserId(parentUserId).getId();
+            }
+        } else {
+            throw new RuntimeException("ProjectUtil--getRedisPrefixByUserIdAndProjectIdAndTaksId 未知账户类型,无法获取集群信息!");
+        }
+        String clusterPrefix = "cluster:" + clusterId;
+        String clusterRunningPrefix = clusterPrefix + ":running";
+        String clusterWaitingPrefix = clusterPrefix + ":waiting";
+        String projectRunningKey = clusterRunningPrefix + ":" + projectId;
+        String projectWaitingKey = clusterWaitingPrefix + ":" + projectId;
+        String projectCheckKey = projectRunningKey + ":check";
+
+        return PrefixTO.builder()
+                .clusterPrefix(clusterPrefix)
+                .clusterRunningPrefix(clusterRunningPrefix)
+                .clusterWaitingPrefix(clusterWaitingPrefix)
+                .projectRunningKey(projectRunningKey)
+                .projectWaitingKey(projectWaitingKey)
+                .projectCheckKey(projectCheckKey)
+                .build();
+    }
 }