Prechádzať zdrojové kódy

不通过 common 发送到 kafka

martin 2 rokov pred
rodič
commit
0f84a3b11f

+ 7 - 7
simulation-resource-monitor/src/main/java/com/css/simulation/resource/monitor/controller/ProjectTaskCtrl.java

@@ -2,25 +2,25 @@ package com.css.simulation.resource.monitor.controller;
 
 import api.common.pojo.common.ResponseBodyVO;
 import com.css.simulation.resource.monitor.scheduler.ProjectScheduler;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import javax.annotation.Resource;
 import java.util.List;
 
 @Controller
 @RequestMapping("/projectTask")
 public class ProjectTaskCtrl {
 
-    @Autowired
-    ProjectScheduler scheduler;
+    @Resource
+    ProjectScheduler projectScheduler;
 
     @RequestMapping("/init")
     @ResponseBody
     public ResponseBodyVO init(@RequestBody List<ProjectScheduler.ProjectTask> task){
-        int count = scheduler.init(task);
+        int count = projectScheduler.init(task);
         return new ResponseBodyVO(ResponseBodyVO.Response.SUCCESS,count);
     }
 
@@ -28,7 +28,7 @@ public class ProjectTaskCtrl {
     @RequestMapping("/start")
     @ResponseBody
     public ResponseBodyVO start(@RequestBody ProjectScheduler.ProjectTask task){
-        boolean b = scheduler.start(task);
+        boolean b = projectScheduler.start(task);
         if(b){
             return new ResponseBodyVO(ResponseBodyVO.Response.SUCCESS);
         }
@@ -38,7 +38,7 @@ public class ProjectTaskCtrl {
     @RequestMapping("/stop")
     @ResponseBody
     public ResponseBodyVO stop(@RequestBody ProjectScheduler.ProjectTask task){
-        boolean b = scheduler.stop(task);
+        boolean b = projectScheduler.stop(task);
         if(b){
             return new ResponseBodyVO(ResponseBodyVO.Response.SUCCESS);
         }
@@ -48,7 +48,7 @@ public class ProjectTaskCtrl {
     @RequestMapping("/destroy")
     @ResponseBody
     public ResponseBodyVO destroy(){
-        scheduler.destroy();
+        projectScheduler.destroy();
         return new ResponseBodyVO(ResponseBodyVO.Response.SUCCESS);
     }
 }

+ 2 - 3
simulation-resource-monitor/src/main/java/com/css/simulation/resource/monitor/scheduler/MyScheduler.java

@@ -11,11 +11,10 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
 import org.dom4j.DocumentException;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -30,7 +29,7 @@ public class MyScheduler {
     List<Host> hostList;
 
 
-    @Autowired
+    @Resource
     SystemServerManager systemServerManager;
 
 

+ 36 - 44
simulation-resource-monitor/src/main/java/com/css/simulation/resource/monitor/scheduler/ProjectScheduler.java

@@ -2,11 +2,10 @@ package com.css.simulation.resource.monitor.scheduler;
 
 import api.common.util.ObjectUtil;
 import com.css.simulation.resource.monitor.feign.ProjectService;
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.Trigger;
-import org.springframework.scheduling.TriggerContext;
 import org.springframework.scheduling.annotation.SchedulingConfigurer;
 import org.springframework.scheduling.config.ScheduledTaskRegistrar;
 import org.springframework.scheduling.support.CronExpression;
@@ -14,17 +13,15 @@ import org.springframework.scheduling.support.CronTrigger;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import javax.annotation.Resource;
+import java.util.*;
 import java.util.concurrent.ScheduledFuture;
 
 @Component
 @Slf4j
 public class ProjectScheduler implements SchedulingConfigurer {
 
-    @Autowired
+    @Resource
     ProjectService projectService;
 
     static ProjectService staticProjectService;
@@ -46,18 +43,19 @@ public class ProjectScheduler implements SchedulingConfigurer {
     /**
      * 初始化任务
      */
-    public int init(List<ProjectTask> projectTask){
-        if(ObjectUtil.isNull(projectTask)){
+    public int init(List<ProjectTask> projectTask) {
+        if (ObjectUtil.isNull(projectTask)) {
             return 0;
         }
         //停止旧任务
         destroy();
         int count = 0;
         for (ProjectTask task : projectTask) {
-            if(start(task)){
+            if (start(task)) {
                 count++;
             }
-        };
+        }
+        ;
         //返回启动成功的任务数量
         return count;
     }
@@ -65,51 +63,47 @@ public class ProjectScheduler implements SchedulingConfigurer {
     /**
      * 开启任务
      */
-    public boolean start(ProjectTask task){
-        if(!validate(task)){
+    public boolean start(ProjectTask task) {
+        if (!validate(task)) {
             return false;
-        };
+        }
         String projectId = task.getProjectId();
         String cron = task.getCron();
         //任务已经存在,并且cron表达式没有变动,直接返回
-        if(scheduledFutures.containsKey(projectId)){
-            if(projectTasks.containsKey(projectId)){
+        if (scheduledFutures.containsKey(projectId)) {
+            if (projectTasks.containsKey(projectId)) {
                 ProjectTask projectTask = projectTasks.get(projectId);
-                if(projectTask.getCron().equals(cron)){
+                if (projectTask.getCron().equals(cron)) {
                     return true;
-                }else{
+                } else {
                     //有变动就停止
                     scheduledFutures.get(projectId).cancel(false);
                 }
-            }else{
+            } else {
                 return false;
             }
         }
         //开启新任务
-        ScheduledFuture<?> schedule = taskRegistrar.getScheduler().schedule(task, new Trigger() {
-            @Override
-            public Date nextExecutionTime(TriggerContext triggerContext) {
-                CronTrigger cronTrigger = new CronTrigger(cron);
-                Date date = cronTrigger.nextExecutionTime(triggerContext);
-                return date;
-            }
+        ScheduledFuture<?> schedule = Objects.requireNonNull(taskRegistrar.getScheduler()).schedule(task, triggerContext -> {
+            CronTrigger cronTrigger = new CronTrigger(cron);
+            return cronTrigger.nextExecutionTime(triggerContext);
         });
         //放入缓存
-        projectTasks.put(projectId,task);
-        scheduledFutures.put(projectId,schedule);
+        projectTasks.put(projectId, task);
+        scheduledFutures.put(projectId, schedule);
         return true;
     }
 
     /**
      * 停止任务
      */
-    public boolean stop(ProjectTask task){
-        if(ObjectUtil.isNull(task) || ObjectUtil.isNull(task.getProjectId())){
+    public boolean stop(ProjectTask task) {
+        if (ObjectUtil.isNull(task) || ObjectUtil.isNull(task.getProjectId())) {
             return false;
         }
         //任务已经存在,停止
         String projectId = task.getProjectId();
-        if(scheduledFutures.containsKey(projectId)){
+        if (scheduledFutures.containsKey(projectId)) {
             scheduledFutures.get(projectId).cancel(false);
             //清除缓存
             projectTasks.remove(projectId);
@@ -122,9 +116,9 @@ public class ProjectScheduler implements SchedulingConfigurer {
     /**
      * 清空所有任务
      */
-    public void destroy(){
+    public void destroy() {
         //停止现有任务
-        scheduledFutures.forEach((k,v)->{
+        scheduledFutures.forEach((k, v) -> {
             //运行中不允许中断,完成后再停止
             v.cancel(false);
         });
@@ -134,16 +128,16 @@ public class ProjectScheduler implements SchedulingConfigurer {
     }
 
     private boolean validate(ProjectTask task) {
-        if(ObjectUtil.isNull(task)){
+        if (ObjectUtil.isNull(task)) {
             return false;
         }
-        if(ObjectUtil.isNull(task.getProjectId()) || ObjectUtil.isNull(task.getCron())){
+        if (ObjectUtil.isNull(task.getProjectId()) || ObjectUtil.isNull(task.getCron())) {
             return false;
         }
         //校验cron表达式
         try {
             CronExpression.parse(task.getCron());
-        }catch (Exception e){
+        } catch (Exception e) {
             return false;
         }
         return true;
@@ -153,19 +147,17 @@ public class ProjectScheduler implements SchedulingConfigurer {
      * 继承该类,重写run方法,即可自定义任务执行逻辑
      */
     @Data
-    public static class ProjectTask implements Runnable{
-
-        public ProjectTask() {}
-
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class ProjectTask implements Runnable {
         private String projectId;
-
         private String cron;
 
         @Override
         public void run() {
             log.info("projectTask [" + projectId + "] run : " + new Date());
-            Map<String,String> projectParam = new HashMap<>();
-            projectParam.put("id",projectId);
+            Map<String, String> projectParam = new HashMap<>();
+            projectParam.put("id", projectId);
             ProjectScheduler.staticProjectService.runProject(projectParam);
         }
     }

+ 3 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/service/ProjectService.java

@@ -337,6 +337,7 @@ public class ProjectService {
         } else { // 从索为算法平台下载算法
             dockerImage = "algorithm_" + algorithmId + ":latest";
             algorithmTarLinuxTempPath = linuxTempPath + "algorithm/" + algorithmId + ".tar";
+            //1 获取 token
             String tokenUri = "http://open-api.zoogooy.com/cgi-bin/token/token?grant_type=client_credential";
             String appid = "3e64be4a29e5478f9717d53c11ab26ad";
             String secret = "f183079f97ac9ed81a864619a83fc17a";
@@ -346,9 +347,11 @@ public class ProjectService {
             JsonNode jsonNode = objectMapper.readTree(result);
             JsonNode dataNode = jsonNode.path("data");
             String token = dataNode.path("access_token").asText();
+            //2 获取 下载地址
             String downloadUrl = "http://open-api.zoogooy.com/cgi-bin/api/icv-algorithm-agg/simulation/download"
                     + "?access_token=" + token
                     + "&id=" + algorithmId;
+            //3 下载算法包
             String tempDownloadUrl = HttpUtil.get(closeableHttpClient, requestConfig, downloadUrl);
             InputStream inputStream = HttpUtil.getInputStream(closeableHttpClient, requestConfig, tempDownloadUrl);
             FileUtil.writeInputStreamToLocalFile(inputStream, algorithmTarLinuxTempPath);

+ 13 - 1
simulation-resource-server/pom.xml

@@ -3,8 +3,8 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>simulation-cloud</artifactId>
         <groupId>com.css</groupId>
+        <artifactId>simulation-cloud</artifactId>
         <version>1.0</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
@@ -17,6 +17,18 @@
 
     <dependencies>
 
+        <!-- apache kafka - 开始 -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka-clients.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <!-- apache kafka - 结束 -->
+
        <!--  测试excel解析-->
         <dependency>
             <groupId>org.apache.poi</groupId>

+ 57 - 45
simulation-resource-server/src/main/java/com/css/simulation/resource/project/impl/SimulationProjectServiceImpl.java

@@ -45,6 +45,7 @@ import feign.Response;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.BeanUtils;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.scheduling.support.CronExpression;
 import org.springframework.stereotype.Service;
 import org.springframework.web.context.request.RequestContextHolder;
@@ -89,6 +90,8 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
     @Resource
     KafkaService kafkaService;
     @Resource
+    KafkaTemplate<String, String> kafkaTemplate;
+    @Resource
     DictService dictService;
 
     @Resource
@@ -3718,55 +3721,48 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
 
     @Override
     public ResponseBodyVO updateAutomaticRunState(SimulationManualProjectParam param) {
-
         String id = param.getId();
-        if (isEmpty(id)) {
-            return new ResponseBodyVO(ResponseBodyVO.Response.CLIENT_FAILURE, "id不能为空");
-        }
-        if (isEmpty(param.getAutomaticRunState())) {
-            return new ResponseBodyVO(ResponseBodyVO.Response.CLIENT_FAILURE, "自动运行状态不能为空");
-        }
+        String automaticRunState = param.getAutomaticRunState();
+        Optional.ofNullable(id).orElseThrow(() -> new RuntimeException("id不能为空。"));
+        Optional.ofNullable(automaticRunState).orElseThrow(() -> new RuntimeException("自动运行状态不能为空。"));
 
-        int i = simulationAutomaticProjectMapper.updateAutomaticRunState(param);
-        if (i > 0) {
-            SimulationAutomaticProjectPo po = simulationAutomaticProjectMapper.selectById(id);
-            String algorithmId = po.getAlgorithm();
-            if ("0".equals(param.getAutomaticRunState())) {
-                //启动
-                //检查算法版本
-                //获取数据库中的算法版本
-                AlgorithmPO aPo = algorithmMapper.selectDetailsById(algorithmId);
-                String gitVersion = aPo.getGitVersion();
-                //获取当前算法版本
-                String currentGitVersion = algorithmService.getGitVersion(algorithmId);
-
-                boolean isRun = false; //判断是否启动运行
-                //首次获取版本
-                if (StringUtil.isEmpty(gitVersion)) {
+        //1 修改父项目停用状态
+        simulationAutomaticProjectMapper.updateAutomaticRunState(param);
+        SimulationAutomaticProjectPo po = simulationAutomaticProjectMapper.selectById(id);  // 查询父项目信息
+        String algorithmId = po.getAlgorithm();
+        if ("0".equals(param.getAutomaticRunState())) {
+            //启动
+            //检查算法版本
+            //获取数据库中的算法版本
+            AlgorithmPO aPo = algorithmMapper.selectDetailsById(algorithmId);
+            String gitVersion = aPo.getGitVersion();
+            //获取当前算法版本
+            String currentGitVersion = algorithmService.getGitVersion(algorithmId);
+            boolean isRun = false; //判断是否启动运行
+            //首次获取版本
+            if (StringUtil.isEmpty(gitVersion)) {
+                isRun = true;
+            } else {
+                //非首次,比对版本,校验是否执行任务
+                if (!gitVersion.equals(currentGitVersion)) {
                     isRun = true;
-                } else {
-                    //非首次,比对版本,校验是否执行任务
-                    if (!gitVersion.equals(currentGitVersion)) {
-                        isRun = true;
-                    }
                 }
-                if (isRun) { //启动运行
-                    AlgorithmParameter algorithmParam = new AlgorithmParameter();
-                    algorithmParam.setId(algorithmId);
-                    algorithmParam.setGitVersion(currentGitVersion);
-                    runProject(algorithmParam, param, po);
-                }
-            } else if ("1".equals(param.getAutomaticRunState())) {
-                //停止
-                //推送定时请求
-                projectTaskStop(po);
             }
+            if (isRun) { //启动运行
+                AlgorithmParameter algorithmParam = new AlgorithmParameter();
+                algorithmParam.setId(algorithmId);
+                algorithmParam.setGitVersion(currentGitVersion);
+                runProject(algorithmParam, param, po);
+            }
+        } else if ("1".equals(param.getAutomaticRunState())) {
+            //停止
+            //推送定时请求
+            projectTaskStop(po);
+        }
 
 
-            return new ResponseBodyVO(ResponseBodyVO.Response.SUCCESS);
-        } else {
-            return new ResponseBodyVO(ResponseBodyVO.Response.SERVER_FAILURE, "操作失败");
-        }
+        return new ResponseBodyVO(ResponseBodyVO.Response.SUCCESS);
+
 
         /*String automaticRunState = param.getAutomaticRunState();
         SimulationAutomaticProjectPo po = new SimulationAutomaticProjectPo();
@@ -3845,7 +3841,7 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
         subprojectPo.setParentProjectId(po.getProjectId());
         subprojectPo.setProjectName(projectName);
         subprojectPo.setStartTime(new Date());
-        subprojectPo.setNowRunState(ProjectRunStateEnum.EXECUTION.getCode());//行中
+        subprojectPo.setNowRunState(ProjectRunStateEnum.EXECUTION.getCode());//行中
 
         SimulationAutomaticSubProjectPo sPo = simulationAutomaticSubProjectMapper.selectLastProjectId(param.getId());
         if (StringUtil.isEmpty(sPo)) {
@@ -3892,8 +3888,24 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
         kafkaParameter.setTopic(ProjectConstants.RUN_TASK_TOPIC);
         String data = JsonUtil.beanToJson(kafkaParam);
         kafkaParameter.setData(data);
-        log.info("自动运行项目推送消息到kafka:" + data);
-        kafkaService.send(kafkaParameter);
+        log.info("自动运行项目推送消息到 kafka:" + data);
+//        kafkaService.send(kafkaParameter);
+
+        kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
+            // 消息发送到的topic
+            String topic = success.getRecordMetadata().topic();
+            // 消息发送到的分区
+            int partition = success.getRecordMetadata().partition();
+            // 消息在分区内的offset
+            long offset = success.getRecordMetadata().offset();
+            log.info("------- 发送消息成功:\n"
+                    + "主题 topic 为:" + topic + "\n"
+                    + "分区 partition 为:" + partition + "\n"
+                    + "偏移量为:" + offset + "\n"
+                    + "消息体为:" + kafkaParameter.getData());
+        }, failure -> {
+            log.error("发送消息失败:" + failure.getMessage());
+        });
     }
 
     @Override

+ 36 - 0
simulation-resource-server/src/main/java/com/css/simulation/resource/project/kafka/ApacheKafkaPartitioner.java

@@ -0,0 +1,36 @@
+package com.css.simulation.resource.project.kafka;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Map;
+
+
+/**
+ * 自定义分区器
+ */
+@Configuration
+public class ApacheKafkaPartitioner implements Partitioner {
+
+
+    /**
+     * 返回分区值。
+     * 全部发送到 0 号分区, 保证消息有序性
+     */
+    @Override
+    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
+
+        return 0;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}