1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- package svc
- import (
- "cicv-data-closedloop/kinglong/common/ent"
- "cicv-data-closedloop/kinglong/common/global"
- commonCfg "cicv-data-closedloop/kinglong/common/log"
- "cicv-data-closedloop/kinglong/common/svc"
- "cicv-data-closedloop/kinglong/common/util"
- slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
- "encoding/json"
- "sync"
- )
- var (
- prepareTimeWindowProducerQueueMutex sync.Mutex
- )
- func PrepareTimeWindowProducerQueue() {
- for {
- select {
- case signal := <-svc.ChannelKillWindowProducer:
- if signal == 1 {
- if len(global.TimeWindowConsumerQueue) == 0 {
- svc.AddKillTimes("3")
- return
- }
- }
- default:
- }
- // 等待新连接
- conn, err := slaveConfig.TcpListener.Accept()
- if err != nil {
- commonCfg.GlobalLogger.Error("接受连接错误:", err)
- continue
- }
- prepareTimeWindowProducerQueueMutex.Lock()
- {
- // 接收数据
- buffer := make([]byte, 2048)
- total, err := conn.Read(buffer)
- if err != nil {
- commonCfg.GlobalLogger.Error("读取数据错误:", err)
- continue
- }
- // 将JSON转换为结构体
- var timeWindow ent.TimeWindow
- err = json.Unmarshal(buffer[:total], &timeWindow)
- if err != nil {
- commonCfg.GlobalLogger.Error("解析Json时出错:", err)
- continue
- }
- //commonCfg.GlobalLogger.Info("接收到时间窗口:",timeWindow)
- util.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
- }
- prepareTimeWindowProducerQueueMutex.Unlock()
- }
- }
|