package svc import ( "cicv-data-closedloop/kinglong/common/ent" commonCfg "cicv-data-closedloop/kinglong/common/log" "cicv-data-closedloop/kinglong/common/svc" "cicv-data-closedloop/kinglong/common/util" slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg" "context" "encoding/json" "sync" ) func PrepareTimeWindowProducerQueue() { var prepareTimeWindowProducerQueueMutex sync.Mutex ctx, cancel := context.WithCancel(context.Background()) // 处理退出信号 go func() { select { case signal := <-svc.ChannelKillWindowProducer: if signal == 1 { cancel() slaveConfig.TcpListener.Close() svc.AddKillTimes("3") return } } }() for { select { case <-ctx.Done(): return default: conn, err := slaveConfig.TcpListener.Accept() if err != nil { select { case <-ctx.Done(): return default: commonCfg.GlobalLogger.Error("接受连接错误:", err) continue } } prepareTimeWindowProducerQueueMutex.Lock() buffer := make([]byte, 2048) total, err := conn.Read(buffer) if err != nil { commonCfg.GlobalLogger.Error("读取数据错误:", err) continue } var timeWindow ent.TimeWindow err = json.Unmarshal(buffer[:total], &timeWindow) if err != nil { commonCfg.GlobalLogger.Error("解析Json时出错:", err) continue } util.AddTimeWindowToTimeWindowProducerQueue(timeWindow) prepareTimeWindowProducerQueueMutex.Unlock() } } }