package svc import ( "cicv-data-closedloop/kinglong/common/ent" "cicv-data-closedloop/kinglong/common/global" commonCfg "cicv-data-closedloop/kinglong/common/log" "cicv-data-closedloop/kinglong/common/svc" "cicv-data-closedloop/kinglong/common/util" slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg" "encoding/json" "sync" ) var ( prepareTimeWindowProducerQueueMutex sync.Mutex ) func PrepareTimeWindowProducerQueue() { for { select { case signal := <-svc.ChannelKillWindowProducer: if signal == 1 { if len(global.TimeWindowConsumerQueue) == 0 { svc.AddKillTimes("3") return } } default: } // 等待新连接 conn, err := slaveConfig.TcpListener.Accept() if err != nil { commonCfg.GlobalLogger.Error("接受连接错误:", err) continue } prepareTimeWindowProducerQueueMutex.Lock() // 接收数据 buffer := make([]byte, 2048) total, err := conn.Read(buffer) if err != nil { commonCfg.GlobalLogger.Error("读取数据错误:", err) continue } // 将JSON转换为结构体 var timeWindow ent.TimeWindow err = json.Unmarshal(buffer[:total], &timeWindow) if err != nil { commonCfg.GlobalLogger.Error("解析Json时出错:", err) continue } //commonCfg.GlobalLogger.Info("接收到时间窗口:",timeWindow) util.AddTimeWindowToTimeWindowProducerQueue(timeWindow) prepareTimeWindowProducerQueueMutex.Unlock() } }