|
@@ -1,212 +1,212 @@
|
|
|
-package com.css.simulation.resource.scheduler.scheduler;
|
|
|
-
|
|
|
-import api.common.pojo.constants.DictConstants;
|
|
|
-import api.common.pojo.dto.ProjectMessageDTO;
|
|
|
-import api.common.util.CollectionUtil;
|
|
|
-import api.common.util.JsonUtil;
|
|
|
-import api.common.util.StringUtil;
|
|
|
-import api.common.util.TimeUtil;
|
|
|
-import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
|
|
|
-import com.css.simulation.resource.scheduler.consumer.ProjectConsumer;
|
|
|
-import com.css.simulation.resource.scheduler.mapper.*;
|
|
|
-import com.css.simulation.resource.scheduler.pojo.po.ClusterPO;
|
|
|
-import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
|
|
|
-import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
|
|
|
-import com.css.simulation.resource.scheduler.pojo.po.UserPO;
|
|
|
-import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
|
|
|
-import com.css.simulation.resource.scheduler.service.TaskService;
|
|
|
-import com.css.simulation.resource.scheduler.util.ProjectUtil;
|
|
|
-import com.css.simulation.resource.scheduler.util.RedisUtil;
|
|
|
-import lombok.SneakyThrows;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.springframework.beans.factory.annotation.Value;
|
|
|
-import org.springframework.data.redis.core.StringRedisTemplate;
|
|
|
-import org.springframework.scheduling.annotation.Scheduled;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-import org.springframework.transaction.annotation.Transactional;
|
|
|
-
|
|
|
-import javax.annotation.Resource;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Set;
|
|
|
-
|
|
|
-@Component
|
|
|
-@Slf4j
|
|
|
-public class ProjectScheduler {
|
|
|
-
|
|
|
- @Value("${scheduler.host.hostname}")
|
|
|
- String hostname;
|
|
|
- @Value("${scheduler.host.username}")
|
|
|
- String username;
|
|
|
- @Value("${scheduler.host.password}")
|
|
|
- String password;
|
|
|
- // -------------------------------- Comment --------------------------------
|
|
|
- @Resource
|
|
|
- StringRedisTemplate stringRedisTemplate;
|
|
|
- @Resource
|
|
|
- TaskService taskService;
|
|
|
- @Resource
|
|
|
- TaskMapper taskMapper;
|
|
|
- @Resource
|
|
|
- ClusterMapper clusterMapper;
|
|
|
- @Resource
|
|
|
- ManualProjectMapper manualProjectMapper;
|
|
|
- @Resource
|
|
|
- AutoSubProjectMapper autoSubProjectMapper;
|
|
|
- @Resource
|
|
|
- ProjectConsumer projectConsumer;
|
|
|
- @Resource
|
|
|
- ProjectUtil projectUtil;
|
|
|
- @Resource
|
|
|
- UserMapper userMapper;
|
|
|
- @Resource
|
|
|
- KubernetesConfiguration kubernetesConfiguration;
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * 调度项目启动
|
|
|
- */
|
|
|
- @Scheduled(fixedDelay = 60 * 1000)
|
|
|
- @SneakyThrows
|
|
|
- public void dispatchProject() {
|
|
|
-
|
|
|
- //1 查询已经在页面上点击运行的项目(后面会判断是排队中还是已经与运行了)
|
|
|
- List<ProjectPO> manualProjectList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
|
|
|
- List<ProjectPO> autoSubProjectList = autoSubProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
|
|
|
- List<ProjectPO> allProject = new ArrayList<>();
|
|
|
- allProject.addAll(manualProjectList);
|
|
|
- allProject.addAll(autoSubProjectList);
|
|
|
-
|
|
|
-
|
|
|
- for (ProjectPO project : allProject) {
|
|
|
- String projectId = project.getId();
|
|
|
- int parallelism = Integer.parseInt(project.getParallelism());
|
|
|
- String userId = project.getCreateUserId();
|
|
|
- UserPO userPO = userMapper.selectById(userId);
|
|
|
- String roleCode = userPO.getRoleCode();
|
|
|
- String useType = userPO.getUseType();
|
|
|
- ClusterPO clusterPO = null;
|
|
|
- String clusterId;
|
|
|
- boolean isSystem = false;
|
|
|
- if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
|
- clusterId = DictConstants.SYSTEM_CLUSTER_ID;
|
|
|
- isSystem = true;
|
|
|
- } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
|
- clusterPO = clusterMapper.selectByUserId(userId);
|
|
|
- if (clusterPO == null) {
|
|
|
- log.error("ProjectScheduler--dispatchProject 项目 " + projectId + " 的创建用户 " + userId + " 没有分配集群!");
|
|
|
- return;
|
|
|
- }
|
|
|
- clusterId = clusterPO.getId();
|
|
|
- } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
|
|
|
- if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) { //3-3 普通子账户,根据自己的独占节点排队
|
|
|
- clusterPO = clusterMapper.selectByUserId(userId);
|
|
|
- } else { //3-4 共享子账户,根据父账户的共享节点排队
|
|
|
- String parentUserId = userPO.getCreateUserId();
|
|
|
- clusterPO = clusterMapper.selectByUserId(parentUserId);
|
|
|
- }
|
|
|
- if (clusterPO == null) {
|
|
|
- log.error("ProjectScheduler--dispatchProject 项目 " + projectId + " 的创建用户 " + userId + " 没有分配集群!");
|
|
|
- return;
|
|
|
- }
|
|
|
- clusterId = clusterPO.getId();
|
|
|
- } else {
|
|
|
- log.error("ProjectConsumer--dispatchProject 项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
|
|
|
- return;
|
|
|
- }
|
|
|
- PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
|
|
|
- // -------------------------------- 判断项目是否已经在执行,如果执行则 continue --------------------------------
|
|
|
- if (StringUtil.isNotEmpty(stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey()))) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- // -------------------------------- 项目没有执行说明等待中 --------------------------------
|
|
|
- if (isSystem) { // 系统管理员直接执行
|
|
|
- run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
|
|
|
- return;
|
|
|
- }
|
|
|
- int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
|
|
|
- // 获取该用户正在运行的项目数量
|
|
|
- Set<String> keySetOfAllProjectOfCluster = RedisUtil.getKeySetByPrefix(stringRedisTemplate, redisPrefix.getClusterRunningPrefix());
|
|
|
- List<String> keyListOfRunningProject = null;
|
|
|
- // cluster:${clusterId}:running:${projectId}
|
|
|
- if (CollectionUtil.isNotEmpty(keySetOfAllProjectOfCluster)) {
|
|
|
- keyListOfRunningProject = projectUtil.getRunningProjectList(keySetOfAllProjectOfCluster); // 筛选出运行中的项目信息 (key 为 cluster:${cluster}:running:${projectId})
|
|
|
- if (CollectionUtil.isNotEmpty(keyListOfRunningProject)) {
|
|
|
- log.info("ProjectScheduler--dispatchProject 运行中的项目的 key 有:" + keyListOfRunningProject);
|
|
|
- long parallelismSum = 0;
|
|
|
- for (String keyOfRunningProject : keyListOfRunningProject) {
|
|
|
- parallelismSum += JsonUtil.jsonToBean(RedisUtil.getStringByKey(stringRedisTemplate, keyOfRunningProject), ProjectMessageDTO.class).getParallelism();
|
|
|
- }
|
|
|
- if (parallelismSum < simulationLicenseNumber) { // 已经运行的项目小于集群允许运行的项目则,运行新的项目
|
|
|
- if (parallelismSum + parallelism < simulationLicenseNumber) {
|
|
|
- run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if ((CollectionUtil.isEmpty(keySetOfAllProjectOfCluster) || CollectionUtil.isEmpty(keyListOfRunningProject)) && parallelism < simulationLicenseNumber) {
|
|
|
- run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- @SneakyThrows
|
|
|
- public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey) {
|
|
|
- ProjectMessageDTO projectMessageDTO;
|
|
|
- try {
|
|
|
- projectMessageDTO = JsonUtil.jsonToBean(RedisUtil.getStringByKey(stringRedisTemplate, projectWaitingKey), ProjectMessageDTO.class);
|
|
|
- } catch (Exception e) {
|
|
|
- log.info("ProjectScheduler--run 等待执行的项目信息已经从 redis 删除。");
|
|
|
- return;
|
|
|
- }
|
|
|
- //1 获取所有节点的剩余可用并行度
|
|
|
- int restParallelism = projectUtil.getRestParallelism();
|
|
|
- if (restParallelism == 0) {
|
|
|
- log.info("ProjectScheduler--run 集群中没有可用并行度,项目 " + projectId + " 继续等待。");
|
|
|
- return;
|
|
|
- }
|
|
|
- //2 判断剩余可用并行度是否大于项目并行度,否则继续等待
|
|
|
- if (restParallelism > 0L) {
|
|
|
- log.info("ProjectScheduler--run 集群 " + clusterId + " 执行项目项目 " + projectId);
|
|
|
- projectMessageDTO.setCurrentParallelism(restParallelism); // 设置实际的并行度
|
|
|
- projectConsumer.parseProject(projectMessageDTO, projectRunningKey);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * 处理 pod 超时
|
|
|
- * 同时也可处理 pod 莫名关闭,因为关闭之后也会超时
|
|
|
- */
|
|
|
- @Scheduled(fixedDelay = 60 * 1000)
|
|
|
- @Transactional
|
|
|
- public void taskTimeout() {
|
|
|
- //1 查询任务信息(需要过滤已经删除的项目,因为页面上删除项目不会删除任务)
|
|
|
- List<TaskPO> executingTaskList = taskMapper.selectByRunStateAndProjectIsNotDeleted(DictConstants.TASK_RUNNING);
|
|
|
- if (CollectionUtil.isEmpty(executingTaskList)) {
|
|
|
- return;
|
|
|
- }
|
|
|
- log.info("ProjectScheduler--taskTimeout 正在运行的任务有:" + executingTaskList);
|
|
|
- for (TaskPO task : executingTaskList) {
|
|
|
- String userId = task.getCreateUserId();
|
|
|
- String projectId = task.getPId();
|
|
|
- String taskId = task.getId();
|
|
|
- PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaskId(userId, projectId, taskId);
|
|
|
- // 获取心跳时间
|
|
|
- String tickTime = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskTickKey());
|
|
|
- if (StringUtil.isEmpty(tickTime)) {
|
|
|
- log.error(redisPrefix.getTaskTickKey() + ",该 key 的心跳时间为空!");
|
|
|
- continue;
|
|
|
- }
|
|
|
- long lastTickTime = Long.parseLong(tickTime);
|
|
|
- // 如果心跳超时则更改任务状态为 Aborted
|
|
|
- if (TimeUtil.getNow() - lastTickTime > Long.parseLong(kubernetesConfiguration.getPodTimeout())) {
|
|
|
- String podName = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskPodKey());
|
|
|
- taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-}
|
|
|
+//package com.css.simulation.resource.scheduler.scheduler;
|
|
|
+//
|
|
|
+//import api.common.pojo.constants.DictConstants;
|
|
|
+//import api.common.pojo.dto.ProjectMessageDTO;
|
|
|
+//import api.common.util.CollectionUtil;
|
|
|
+//import api.common.util.JsonUtil;
|
|
|
+//import api.common.util.StringUtil;
|
|
|
+//import api.common.util.TimeUtil;
|
|
|
+//import com.css.simulation.resource.scheduler.configuration.kubernetes.KubernetesConfiguration;
|
|
|
+//import com.css.simulation.resource.scheduler.consumer.ProjectConsumer;
|
|
|
+//import com.css.simulation.resource.scheduler.mapper.*;
|
|
|
+//import com.css.simulation.resource.scheduler.pojo.po.ClusterPO;
|
|
|
+//import com.css.simulation.resource.scheduler.pojo.po.ProjectPO;
|
|
|
+//import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
|
|
|
+//import com.css.simulation.resource.scheduler.pojo.po.UserPO;
|
|
|
+//import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
|
|
|
+//import com.css.simulation.resource.scheduler.service.TaskService;
|
|
|
+//import com.css.simulation.resource.scheduler.util.ProjectUtil;
|
|
|
+//import com.css.simulation.resource.scheduler.util.RedisUtil;
|
|
|
+//import lombok.SneakyThrows;
|
|
|
+//import lombok.extern.slf4j.Slf4j;
|
|
|
+//import org.springframework.beans.factory.annotation.Value;
|
|
|
+//import org.springframework.data.redis.core.StringRedisTemplate;
|
|
|
+//import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+//import org.springframework.stereotype.Component;
|
|
|
+//import org.springframework.transaction.annotation.Transactional;
|
|
|
+//
|
|
|
+//import javax.annotation.Resource;
|
|
|
+//import java.util.ArrayList;
|
|
|
+//import java.util.List;
|
|
|
+//import java.util.Set;
|
|
|
+//
|
|
|
+//@Component
|
|
|
+//@Slf4j
|
|
|
+//public class ProjectScheduler {
|
|
|
+//
|
|
|
+// @Value("${scheduler.host.hostname}")
|
|
|
+// String hostname;
|
|
|
+// @Value("${scheduler.host.username}")
|
|
|
+// String username;
|
|
|
+// @Value("${scheduler.host.password}")
|
|
|
+// String password;
|
|
|
+// // -------------------------------- Comment --------------------------------
|
|
|
+// @Resource
|
|
|
+// StringRedisTemplate stringRedisTemplate;
|
|
|
+// @Resource
|
|
|
+// TaskService taskService;
|
|
|
+// @Resource
|
|
|
+// TaskMapper taskMapper;
|
|
|
+// @Resource
|
|
|
+// ClusterMapper clusterMapper;
|
|
|
+// @Resource
|
|
|
+// ManualProjectMapper manualProjectMapper;
|
|
|
+// @Resource
|
|
|
+// AutoSubProjectMapper autoSubProjectMapper;
|
|
|
+// @Resource
|
|
|
+// ProjectConsumer projectConsumer;
|
|
|
+// @Resource
|
|
|
+// ProjectUtil projectUtil;
|
|
|
+// @Resource
|
|
|
+// UserMapper userMapper;
|
|
|
+// @Resource
|
|
|
+// KubernetesConfiguration kubernetesConfiguration;
|
|
|
+//
|
|
|
+//
|
|
|
+// /**
|
|
|
+// * 调度项目启动
|
|
|
+// */
|
|
|
+// @Scheduled(fixedDelay = 60 * 1000)
|
|
|
+// @SneakyThrows
|
|
|
+// public void dispatchProject() {
|
|
|
+//
|
|
|
+// //1 查询已经在页面上点击运行的项目(后面会判断是排队中还是已经与运行了)
|
|
|
+// List<ProjectPO> manualProjectList = manualProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
|
|
|
+// List<ProjectPO> autoSubProjectList = autoSubProjectMapper.selectByNowRunState(DictConstants.PROJECT_RUNNING);
|
|
|
+// List<ProjectPO> allProject = new ArrayList<>();
|
|
|
+// allProject.addAll(manualProjectList);
|
|
|
+// allProject.addAll(autoSubProjectList);
|
|
|
+//
|
|
|
+//
|
|
|
+// for (ProjectPO project : allProject) {
|
|
|
+// String projectId = project.getId();
|
|
|
+// int parallelism = Integer.parseInt(project.getParallelism());
|
|
|
+// String userId = project.getCreateUserId();
|
|
|
+// UserPO userPO = userMapper.selectById(userId);
|
|
|
+// String roleCode = userPO.getRoleCode();
|
|
|
+// String useType = userPO.getUseType();
|
|
|
+// ClusterPO clusterPO = null;
|
|
|
+// String clusterId;
|
|
|
+// boolean isSystem = false;
|
|
|
+// if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
|
+// clusterId = DictConstants.SYSTEM_CLUSTER_ID;
|
|
|
+// isSystem = true;
|
|
|
+// } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
|
+// clusterPO = clusterMapper.selectByUserId(userId);
|
|
|
+// if (clusterPO == null) {
|
|
|
+// log.error("dispatchProject 项目 " + projectId + " 的创建用户 " + userId + " 没有分配集群!");
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// clusterId = clusterPO.getId();
|
|
|
+// } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
|
|
|
+// if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) { //3-3 普通子账户,根据自己的独占节点排队
|
|
|
+// clusterPO = clusterMapper.selectByUserId(userId);
|
|
|
+// } else { //3-4 共享子账户,根据父账户的共享节点排队
|
|
|
+// String parentUserId = userPO.getCreateUserId();
|
|
|
+// clusterPO = clusterMapper.selectByUserId(parentUserId);
|
|
|
+// }
|
|
|
+// if (clusterPO == null) {
|
|
|
+// log.error("dispatchProject 项目 " + projectId + " 的创建用户 " + userId + " 没有分配集群!");
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// clusterId = clusterPO.getId();
|
|
|
+// } else {
|
|
|
+// log.error("ProjectConsumer--dispatchProject 项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// PrefixTO redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
|
|
|
+// // -------------------------------- 判断项目是否已经在执行,如果执行则 continue --------------------------------
|
|
|
+// if (StringUtil.isNotEmpty(stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey()))) {
|
|
|
+// continue;
|
|
|
+// }
|
|
|
+// // -------------------------------- 项目没有执行说明等待中 --------------------------------
|
|
|
+// if (isSystem) { // 系统管理员直接执行
|
|
|
+// run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// int simulationLicenseNumber = clusterPO.getNumSimulationLicense();
|
|
|
+// // 获取该用户正在运行的项目数量
|
|
|
+// Set<String> keySetOfAllProjectOfCluster = RedisUtil.getKeySetByPrefix(stringRedisTemplate, redisPrefix.getClusterRunningPrefix());
|
|
|
+// List<String> keyListOfRunningProject = null;
|
|
|
+// // cluster:${clusterId}:running:${projectId}
|
|
|
+// if (CollectionUtil.isNotEmpty(keySetOfAllProjectOfCluster)) {
|
|
|
+// keyListOfRunningProject = projectUtil.getRunningProjectList(keySetOfAllProjectOfCluster); // 筛选出运行中的项目信息 (key 为 cluster:${cluster}:running:${projectId})
|
|
|
+// if (CollectionUtil.isNotEmpty(keyListOfRunningProject)) {
|
|
|
+// log.info("dispatchProject 运行中的项目的 key 有:" + keyListOfRunningProject);
|
|
|
+// long parallelismSum = 0;
|
|
|
+// for (String keyOfRunningProject : keyListOfRunningProject) {
|
|
|
+// parallelismSum += JsonUtil.jsonToBean(RedisUtil.getStringByKey(stringRedisTemplate, keyOfRunningProject), ProjectMessageDTO.class).getParallelism();
|
|
|
+// }
|
|
|
+// if (parallelismSum < simulationLicenseNumber) { // 已经运行的项目小于集群允许运行的项目则,运行新的项目
|
|
|
+// if (parallelismSum + parallelism < simulationLicenseNumber) {
|
|
|
+// run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// if ((CollectionUtil.isEmpty(keySetOfAllProjectOfCluster) || CollectionUtil.isEmpty(keyListOfRunningProject)) && parallelism < simulationLicenseNumber) {
|
|
|
+// run(clusterId, projectId, redisPrefix.getProjectWaitingKey(), redisPrefix.getProjectRunningKey());
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+//
|
|
|
+// @SneakyThrows
|
|
|
+// public void run(String clusterId, String projectId, String projectWaitingKey, String projectRunningKey) {
|
|
|
+// ProjectMessageDTO projectMessageDTO;
|
|
|
+// try {
|
|
|
+// projectMessageDTO = JsonUtil.jsonToBean(RedisUtil.getStringByKey(stringRedisTemplate, projectWaitingKey), ProjectMessageDTO.class);
|
|
|
+// } catch (Exception e) {
|
|
|
+// log.info("run 等待执行的项目信息已经从 redis 删除。");
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// //1 获取所有节点的剩余可用并行度
|
|
|
+// int restParallelism = projectUtil.getRestParallelism();
|
|
|
+// if (restParallelism == 0) {
|
|
|
+// log.info("run 集群中没有可用并行度,项目 " + projectId + " 继续等待。");
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// //2 判断剩余可用并行度是否大于项目并行度,否则继续等待
|
|
|
+// if (restParallelism > 0L) {
|
|
|
+// log.info("run 集群 " + clusterId + " 执行项目项目 " + projectId);
|
|
|
+// projectMessageDTO.setCurrentParallelism(restParallelism); // 设置实际的并行度
|
|
|
+// projectConsumer.parseProject(projectMessageDTO, projectRunningKey);
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+//
|
|
|
+// /**
|
|
|
+// * 处理 pod 超时
|
|
|
+// * 同时也可处理 pod 莫名关闭,因为关闭之后也会超时
|
|
|
+// */
|
|
|
+// @Scheduled(fixedDelay = 60 * 1000)
|
|
|
+// @Transactional
|
|
|
+// public void taskTimeout() {
|
|
|
+// //1 查询任务信息(需要过滤已经删除的项目,因为页面上删除项目不会删除任务)
|
|
|
+// List<TaskPO> executingTaskList = taskMapper.selectByRunStateAndProjectIsNotDeleted(DictConstants.TASK_RUNNING);
|
|
|
+// if (CollectionUtil.isEmpty(executingTaskList)) {
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// log.info("taskTimeout 正在运行的任务有:" + executingTaskList);
|
|
|
+// for (TaskPO task : executingTaskList) {
|
|
|
+// String userId = task.getCreateUserId();
|
|
|
+// String projectId = task.getPId();
|
|
|
+// String taskId = task.getId();
|
|
|
+// PrefixTO redisPrefix = projectUtil.getRedisPrefixByUserIdAndProjectIdAndTaskId(userId, projectId, taskId);
|
|
|
+// // 获取心跳时间
|
|
|
+// String tickTime = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskTickKey());
|
|
|
+// if (StringUtil.isEmpty(tickTime)) {
|
|
|
+// log.error(redisPrefix.getTaskTickKey() + ",该 key 的心跳时间为空!");
|
|
|
+// continue;
|
|
|
+// }
|
|
|
+// long lastTickTime = Long.parseLong(tickTime);
|
|
|
+// // 如果心跳超时则更改任务状态为 Aborted
|
|
|
+// if (TimeUtil.getNow() - lastTickTime > Long.parseLong(kubernetesConfiguration.getPodTimeout())) {
|
|
|
+// String podName = stringRedisTemplate.opsForValue().get(redisPrefix.getTaskPodKey());
|
|
|
+// taskService.taskState(taskId, DictConstants.TASK_ABORTED, podName);
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+//}
|