123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- package svc
- import (
- "cicv-data-closedloop/kinglong/common/ent"
- "cicv-data-closedloop/kinglong/common/global"
- commonCfg "cicv-data-closedloop/kinglong/common/log"
- "cicv-data-closedloop/kinglong/common/svc"
- "cicv-data-closedloop/kinglong/common/util"
- slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
- "encoding/json"
- "sync"
- )
- var (
- prepareTimeWindowProducerQueueMutex sync.Mutex
- )
- func PrepareTimeWindowProducerQueue() {
- for {
- select {
- case signal := <-svc.ChannelKillWindowProducer:
- if signal == 1 {
- if len(global.TimeWindowConsumerQueue) == 0 {
- svc.AddKillTimes("3")
- return
- }
- }
- default:
- }
- // 等待新连接
- conn, err := slaveConfig.TcpListener.Accept()
- if err != nil {
- commonCfg.GlobalLogger.Error("接受连接错误:", err)
- continue
- }
- prepareTimeWindowProducerQueueMutex.Lock()
- // 接收数据
- buffer := make([]byte, 2048)
- total, err := conn.Read(buffer)
- if err != nil {
- commonCfg.GlobalLogger.Error("读取数据错误:", err)
- continue
- }
- // 将JSON转换为结构体
- var timeWindow ent.TimeWindow
- err = json.Unmarshal(buffer[:total], &timeWindow)
- if err != nil {
- commonCfg.GlobalLogger.Error("解析Json时出错:", err)
- continue
- }
- //commonCfg.GlobalLogger.Info("接收到时间窗口:",timeWindow)
- util.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
- prepareTimeWindowProducerQueueMutex.Unlock()
- }
- }
|