i_kafka.go 608 B

123456789101112131415161718192021222324
  1. package infra
  2. import (
  3. "github.com/confluentinc/confluent-kafka-go/kafka"
  4. )
  5. // 定义一个全局的Kafka生产者变量
  6. var GlobalKafkaProducer *kafka.Producer
  7. // 初始化Kafka生产者的函数
  8. func InitKafkaProducer(brokers []string) {
  9. // 创建配置
  10. conf := &kafka.ConfigMap{
  11. "bootstrap.servers": brokers,
  12. "client.id": "kafka-client-cicv-data-closedloop",
  13. }
  14. // 创建生产者
  15. var err error
  16. GlobalKafkaProducer, err = kafka.NewProducer(conf)
  17. if err != nil {
  18. GlobalLogger.Errorf("初始化kafka生产者失败,配置信息为:%v,错误信息为:%v", brokers, err)
  19. }
  20. }