123456789101112131415161718192021222324 |
- package infra
- import (
- "github.com/confluentinc/confluent-kafka-go/kafka"
- )
- // 定义一个全局的Kafka生产者变量
- var GlobalKafkaProducer *kafka.Producer
- // 初始化Kafka生产者的函数
- func InitKafkaProducer(brokers []string) {
- // 创建配置
- conf := &kafka.ConfigMap{
- "bootstrap.servers": brokers,
- "client.id": "kafka-client-cicv-data-closedloop",
- }
- // 创建生产者
- var err error
- GlobalKafkaProducer, err = kafka.NewProducer(conf)
- if err != nil {
- GlobalLogger.Errorf("初始化kafka生产者失败,配置信息为:%v,错误信息为:%v", brokers, err)
- }
- }
|