package svc import ( "cicv-data-closedloop/kinglong/common/ent" commonCfg "cicv-data-closedloop/kinglong/common/log" "cicv-data-closedloop/kinglong/common/util" slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg" "encoding/json" "sync" ) var ( prepareTimeWindowProducerQueueMutex sync.Mutex ) func PrepareTimeWindowProducerQueue() { for { // 等待新连接 conn, err := slaveConfig.TcpListener.Accept() if err != nil { commonCfg.GlobalLogger.Error("接受连接错误:", err) continue } prepareTimeWindowProducerQueueMutex.Lock() { // 接收数据 buffer := make([]byte, 2048) total, err := conn.Read(buffer) if err != nil { commonCfg.GlobalLogger.Error("读取数据错误:", err) continue } // 将JSON转换为结构体 var timeWindow ent.TimeWindow err = json.Unmarshal(buffer[:total], &timeWindow) if err != nil { commonCfg.GlobalLogger.Error("解析Json时出错:", err) continue } //commonCfg.GlobalLogger.Info("接收到时间窗口:",timeWindow) util.AddTimeWindowToTimeWindowProducerQueue(timeWindow) } prepareTimeWindowProducerQueueMutex.Unlock() } }