martin 3 년 전
부모
커밋
43234779c6

+ 50 - 0
simulation-resource-common/src/main/java/com/css/simulation/resource/common/controller/KafkaController.java

@@ -1,7 +1,57 @@
 package com.css.simulation.resource.common.controller;
 
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.annotation.Resource;
+
 @RestController
+@RequestMapping("/kafka")
 public class KafkaController {
+
+        @Resource
+        private KafkaTemplate<String, Object> kafkaTemplate;
+
+        // -------------------------------- 简单发送 --------------------------------
+
+        @RequestMapping("/send")
+        public void sendMessage1(String normalMessage) {
+            kafkaTemplate.send("topic1",0,"key", normalMessage);
+        }
+
+        // -------------------------------- 带回调函数的发送 --------------------------------
+        public void sendMessage2(String callbackMessage) {
+            kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
+                // 消息发送到的topic
+                String topic = success.getRecordMetadata().topic();
+                // 消息发送到的分区
+                int partition = success.getRecordMetadata().partition();
+                // 消息在分区内的offset
+                long offset = success.getRecordMetadata().offset();
+                System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
+            }, failure -> {
+                System.out.println("发送消息失败:" + failure.getMessage());
+            });
+        }
+
+
+        public void sendMessage3(String callbackMessage) {
+            kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
+                @Override
+                public void onFailure(Throwable throwable) {
+                    System.out.println("发送消息失败:" + throwable.getMessage());
+                }
+
+                @Override
+                public void onSuccess(SendResult<String, Object> result) {
+                    System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+                            + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
+                }
+            });
+        }
+
+
 }

+ 41 - 0
simulation-resource-common/src/main/java/com/css/simulation/resource/common/controller/advice/AllExceptionHandlerAdvice.java

@@ -0,0 +1,41 @@
+package com.css.simulation.resource.common.controller.advice;
+
+import api.common.pojo.common.ResponseBodyVO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.web.bind.MethodArgumentNotValidException;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.RestControllerAdvice;
+
+import java.util.Objects;
+
+/**
+ * 全局异常处理
+ *
+ * @author martin
+ */
+@RestControllerAdvice
+@Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
+@Slf4j
+public class AllExceptionHandlerAdvice {
+
+    /**
+     * 方法参数校验
+     */
+    @ExceptionHandler(MethodArgumentNotValidException.class)
+    public ResponseBodyVO<Object> handleMethodArgumentNotValidException(MethodArgumentNotValidException e) {
+        log.error(e.getMessage(), e);
+        return new ResponseBodyVO<>(ResponseBodyVO.Response.SERVER_FAILURE, Objects.requireNonNull(e.getBindingResult().getFieldError()).getDefaultMessage());
+    }
+
+    /**
+     * 服务器错误异常统一处理
+     */
+    @ExceptionHandler(Exception.class)
+    public ResponseBodyVO<Object> handleException(Exception e) {
+        log.error(e.getMessage(), e);
+        return new ResponseBodyVO<>(ResponseBodyVO.Response.SERVER_FAILURE, "服务器错误!");
+    }
+
+}

+ 37 - 0
simulation-resource-common/src/main/resources/bootstrap.yml

@@ -0,0 +1,37 @@
+server:
+  port: 8001
+  servlet:
+    context-path: /simulation/resource/scheduler
+
+spring:
+  application:
+    name: simulation-resource-scheduler
+  #* -------------------------------- 环境配置 --------------------------------
+  profiles:
+    active: dev
+  #* -------------------------------- 微服务配置 --------------------------------
+  cloud:
+    nacos:
+      discovery:
+        server-addr: 10.15.12.70:8848
+        namespace: 3698bfc2-a612-487a-b2a2-aaad16cd9d9d
+      config:
+        server-addr: 10.15.12.70:8848
+        namespace: 3698bfc2-a612-487a-b2a2-aaad16cd9d9d
+        file-extension: yaml
+  #* -------------------------------- Kafka --------------------------------
+  kafka:
+    bootstrap-servers: 10.15.12.70:9092    # 指定连接的 kafka 集群。
+    listener:
+      missing-topics-fatal: false   # 消费端监听的topic不存在时,项目启动会报错(关掉)
+    #      type: batch # 设置批量消费
+    producer: # 生产者配置
+      retries: 3                          # 重试次数。
+      acks: 1                             # 指定 ack 应答级别。0,1,-1
+      batch-size: 16384                   # 批次大小
+      buffer-memory: 33554432             # 缓冲区大小
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      value-serializer: org.apache.kafka.common.serialization.StringSerializer
+      properties:
+        linger:
+          ms: 0 # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

+ 41 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/controller/advice/AllExceptionHandlerAdvice.java

@@ -0,0 +1,41 @@
+package com.css.simulation.resource.scheduler.controller.advice;
+
+import api.common.pojo.common.ResponseBodyVO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.web.bind.MethodArgumentNotValidException;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.RestControllerAdvice;
+
+import java.util.Objects;
+
+/**
+ * 全局异常处理
+ *
+ * @author martin
+ */
+@RestControllerAdvice
+@Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
+@Slf4j
+public class AllExceptionHandlerAdvice {
+
+    /**
+     * 方法参数校验
+     */
+    @ExceptionHandler(MethodArgumentNotValidException.class)
+    public ResponseBodyVO<Object> handleMethodArgumentNotValidException(MethodArgumentNotValidException e) {
+        log.error(e.getMessage(), e);
+        return new ResponseBodyVO<>(ResponseBodyVO.Response.SERVER_FAILURE, Objects.requireNonNull(e.getBindingResult().getFieldError()).getDefaultMessage());
+    }
+
+    /**
+     * 服务器错误异常统一处理
+     */
+    @ExceptionHandler(Exception.class)
+    public ResponseBodyVO<Object> handleException(Exception e) {
+        log.error(e.getMessage(), e);
+        return new ResponseBodyVO<>(ResponseBodyVO.Response.SERVER_FAILURE, "服务器错误!");
+    }
+
+}