package infra import ( "github.com/confluentinc/confluent-kafka-go/kafka" ) // 定义一个全局的Kafka生产者变量 var GlobalKafkaProducer *kafka.Producer // 初始化Kafka生产者的函数 func InitKafkaProducer(broker string) { // 创建配置 conf := &kafka.ConfigMap{ "bootstrap.servers": broker, "client.id": "kafka-client-cicv-data-closedloop", } // 创建生产者 var err error GlobalKafkaProducer, err = kafka.NewProducer(conf) if err != nil { GlobalLogger.Errorf("初始化kafka生产者失败,配置信息为:%v,错误信息为:%v", broker, err) } GlobalLogger.Errorf("初始化kafka生产者成功:%v", broker) }