12345678910111213141516171819202122232425262728293031323334353637383940414243444546 |
- package svc
- import (
- "cicv-data-closedloop/kinglong/common/ent"
- commonCfg "cicv-data-closedloop/kinglong/common/log"
- "cicv-data-closedloop/kinglong/common/util"
- slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
- "encoding/json"
- "sync"
- )
- var (
- prepareTimeWindowProducerQueueMutex sync.Mutex
- )
- func PrepareTimeWindowProducerQueue() {
- for {
- // 等待新连接
- conn, err := slaveConfig.TcpListener.Accept()
- if err != nil {
- commonCfg.GlobalLogger.Error("接受连接错误:", err)
- continue
- }
- prepareTimeWindowProducerQueueMutex.Lock()
- {
- // 接收数据
- buffer := make([]byte, 2048)
- total, err := conn.Read(buffer)
- if err != nil {
- commonCfg.GlobalLogger.Error("读取数据错误:", err)
- continue
- }
- // 将JSON转换为结构体
- var timeWindow ent.TimeWindow
- err = json.Unmarshal(buffer[:total], &timeWindow)
- if err != nil {
- commonCfg.GlobalLogger.Error("解析Json时出错:", err)
- continue
- }
- //commonCfg.GlobalLogger.Info("接收到时间窗口:",timeWindow)
- util.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
- }
- prepareTimeWindowProducerQueueMutex.Unlock()
- }
- }
|