martin hace 3 años
padre
commit
944968af30

+ 21 - 27
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -9,8 +9,6 @@ import com.css.simulation.resource.scheduler.pojo.po.*;
 import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
 import com.css.simulation.resource.scheduler.service.ManualProjectService;
 import com.css.simulation.resource.scheduler.util.ProjectUtil;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -65,15 +63,11 @@ public class ProjectConsumer {
     ManualProjectService manualProjectService;
     @Autowired
     ProjectUtil projectUtil;
-    //    @Autowired
-//    ApiClient apiClient;
-    @Value("${scheduler.manual-project.topic}")
-    String manualProjectTopic;
-    @Value("${scheduler.manual-project.result-path-minio}")
+    @Value("${scheduler.project.result-path-minio}")
     String resultPathMinio;
-    @Value("${scheduler.manual-project.job-template}")
+    @Value("${scheduler.project.job-template}")
     String jobTemplate;
-    @Value("${scheduler.manual-project.job-yaml}")
+    @Value("${scheduler.project.job-yaml}")
     String jobYaml;
 
     @Value("${scheduler.score.hostname}")
@@ -234,24 +228,24 @@ public class ProjectConsumer {
         client.stop();
     }
 
-    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.stop-topic}")
-    @SneakyThrows
-    public void stopManualProject(ConsumerRecord<String, String> stopRecord) {
-        log.info("ProjectConsumer--stopManualProject 接收到的项目终止消息为:" + stopRecord);
-        //1 读取 kafka 的项目停止信息
-        /*
-            {
-                "projectId": "sadfasdfs",	// 项目 id
-            }
-         */
-        String json = stopRecord.value();
-        ObjectMapper objectMapper = new ObjectMapper();
-        JsonNode jsonNode = objectMapper.readTree(json);
-        String projectId = jsonNode.path("projectId").asText();
-        manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql());
-        LinuxUtil.execute("kubectl delete job project-" + projectId);
-        redisTemplate.delete(manualProjectTopic + ":" + projectId + ":check");
-    }
+//    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.stop-topic}")
+//    @SneakyThrows
+//    public void stopManualProject(ConsumerRecord<String, String> stopRecord) {
+//        log.info("ProjectConsumer--stopManualProject 接收到的项目终止消息为:" + stopRecord);
+//        //1 读取 kafka 的项目停止信息
+//        /*
+//            {
+//                "projectId": "sadfasdfs",	// 项目 id
+//            }
+//         */
+//        String json = stopRecord.value();
+//        ObjectMapper objectMapper = new ObjectMapper();
+//        JsonNode jsonNode = objectMapper.readTree(json);
+//        String projectId = jsonNode.path("projectId").asText();
+//        manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql());
+//        LinuxUtil.execute("kubectl delete job project-" + projectId);
+//        redisTemplate.delete(manualProjectTopic + ":" + projectId + ":check");
+//    }
 
 
 }

+ 3 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/manager/TaskManager.java

@@ -177,7 +177,7 @@ public class TaskManager {
     }
 
     @SneakyThrows
-    public void score(String userId, String projectId, ClientSession session) {
+    public void score(PrefixTO redisPrefix,String userId, String projectId, ClientSession session) {
         // -------------------------------- 打分 --------------------------------
         ProjectPO projectPO = manualProjectMapper.selectById(projectId);
         String packageId = projectPO.getScenePackageId();  // 场景测试包 id,指标根 id
@@ -189,9 +189,9 @@ public class TaskManager {
         indexMapper.deleteFirstByProjectId(projectId);
         indexMapper.deleteLastByProjectId(projectId);
         //1 查询场景包对应指标
-        String allIndexTemplateListJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":package:" + packageId + ":all");
+        String allIndexTemplateListJson = stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey() + ":package:" + packageId + ":all");
         List<IndexTemplatePO> allIndexTemplateList = JsonUtil.jsonToList(allIndexTemplateListJson, IndexTemplatePO.class);
-        String leafIndexTemplateListJson = stringRedisTemplate.opsForValue().get(manualProjectTopic + ":" + userId + ":" + projectId + ":package:" + packageId + ":leaf");
+        String leafIndexTemplateListJson = stringRedisTemplate.opsForValue().get(redisPrefix.getProjectRunningKey() + ":package:" + packageId + ":leaf");
         List<IndexTemplatePO> leafIndexTemplateList = JsonUtil.jsonToList(leafIndexTemplateListJson, IndexTemplatePO.class);
         log.info("TaskManager--score 共有 " + leafIndexTemplateList.size() + "个叶子节点:" + leafIndexTemplateListJson);
         int maxLevel = 1; // 用于计算指标得分

+ 5 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/scheduler/ProjectScheduler.java

@@ -34,8 +34,6 @@ import java.util.Set;
 @Slf4j
 public class ProjectScheduler {
 
-    @Value("${scheduler.manual-project.topic}")
-    String manualProjectTopic;
     @Autowired
     StringRedisTemplate redisTemplate;
     @Autowired
@@ -46,7 +44,7 @@ public class ProjectScheduler {
     ClusterMapper clusterMapper;
     @Autowired
     ManualProjectMapper manualProjectMapper;
-    @Value("${scheduler.manual-project.job-yaml}")
+    @Value("${scheduler.project.job-yaml}")
     String jobYaml;
     @Value("${scheduler.score.hostname}")
     String hostname;
@@ -120,7 +118,10 @@ public class ProjectScheduler {
         String clusterPrefix = "cluster:" + clusterId;
         String projectJson = redisTemplate.opsForValue().get(projectWaitingKey);
         redisTemplate.delete(projectWaitingKey);
-        assert projectJson != null;
+        if (StringUtil.isEmpty(projectJson)) {
+            log.error("ProjectScheduler--run 项目 " + projectId + "的开始消息查询失败,key 为:" + projectWaitingKey);
+            return;
+        }
         redisTemplate.opsForValue().set(projectRunningKey, projectJson);
         log.info("ProjectScheduler--run 项目 " + projectId + " 从等待队列进入执行状态!");
         projectConsumer.parseManualProject(projectJson, clusterPrefix, projectRunningKey);

+ 1 - 3
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/TaskService.java

@@ -31,8 +31,6 @@ public class TaskService {
     TaskManager taskManager;
     @Autowired
     IndexTemplateMapper indexTemplateMapper;
-    @Value("${scheduler.manual-project.topic}")
-    String manualProjectTopic;
     @Value("${scheduler.score.hostname}")
     String hostname;
     @Value("${scheduler.score.username}")
@@ -79,7 +77,7 @@ public class TaskService {
         taskManager.prepareScore(redisPrefix.getProjectRunningKey());
 
         //3 打分
-        taskManager.score(userId, projectId, session);
+        taskManager.score(redisPrefix, userId, projectId, session);
 
         // -------------------------------- 收尾 --------------------------------
         manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_COMPLETED, TimeUtil.getNowForMysql());   // 修改该 project 的状态为已完成