Browse Source

使用自己的 kafka

martin 2 years ago
parent
commit
422276737c

+ 49 - 4
simulation-resource-server/src/main/java/com/css/simulation/resource/project/impl/SimulationProjectServiceImpl.java

@@ -288,7 +288,7 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
             //Kafka推送消息
             projectRunToKafka(po);
 
-        } else {
+        } else {    //创建新的项目
             if ("20".equals(param.getNowRunState())) {
                 //设置开始时间
                 param.setStartTime(new Date());
@@ -333,7 +333,22 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
         String data = JsonUtil.beanToJson(kafkaParam);
         kafkaParameter.setData(data);
         log.info("推送项目运行消息到kafka:" + data);
-        kafkaService.send(kafkaParameter);
+//        kafkaService.send(kafkaParameter);
+        kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
+            // 消息发送到的topic
+            String topic = success.getRecordMetadata().topic();
+            // 消息发送到的分区
+            int partition = success.getRecordMetadata().partition();
+            // 消息在分区内的offset
+            long offset = success.getRecordMetadata().offset();
+            log.info("------- 发送消息成功:\n"
+                    + "主题 topic 为:" + topic + "\n"
+                    + "分区 partition 为:" + partition + "\n"
+                    + "偏移量为:" + offset + "\n"
+                    + "消息体为:" + kafkaParameter.getData());
+        }, failure -> {
+            log.error("发送消息失败:" + failure.getMessage());
+        });
     }
 
     private void projectStopToKafka(SimulationManualProjectPo po) throws JsonProcessingException {
@@ -345,7 +360,22 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
         String data = JsonUtil.beanToJson(kafkaParam);
         kafkaParameter.setData(data);
         log.info("推送项目中止消息到kafka:" + data);
-        kafkaService.send(kafkaParameter);
+//        kafkaService.send(kafkaParameter);
+        kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
+            // 消息发送到的topic
+            String topic = success.getRecordMetadata().topic();
+            // 消息发送到的分区
+            int partition = success.getRecordMetadata().partition();
+            // 消息在分区内的offset
+            long offset = success.getRecordMetadata().offset();
+            log.info("------- 发送消息成功:\n"
+                    + "主题 topic 为:" + topic + "\n"
+                    + "分区 partition 为:" + partition + "\n"
+                    + "偏移量为:" + offset + "\n"
+                    + "消息体为:" + kafkaParameter.getData());
+        }, failure -> {
+            log.error("发送消息失败:" + failure.getMessage());
+        });
 
     }
 
@@ -358,7 +388,22 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
         String data = JsonUtil.beanToJson(kafkaParam);
         kafkaParameter.setData(data);
         log.info("推送自动项目中止消息到kafka:" + data);
-        kafkaService.send(kafkaParameter);
+//        kafkaService.send(kafkaParameter);
+        kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
+            // 消息发送到的topic
+            String topic = success.getRecordMetadata().topic();
+            // 消息发送到的分区
+            int partition = success.getRecordMetadata().partition();
+            // 消息在分区内的offset
+            long offset = success.getRecordMetadata().offset();
+            log.info("------- 发送消息成功:\n"
+                    + "主题 topic 为:" + topic + "\n"
+                    + "分区 partition 为:" + partition + "\n"
+                    + "偏移量为:" + offset + "\n"
+                    + "消息体为:" + kafkaParameter.getData());
+        }, failure -> {
+            log.error("发送消息失败:" + failure.getMessage());
+        });
 
     }