123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- package svc
- import (
- "cicv-data-closedloop/kinglong/common/ent"
- commonCfg "cicv-data-closedloop/kinglong/common/log"
- "cicv-data-closedloop/kinglong/common/svc"
- "cicv-data-closedloop/kinglong/common/util"
- slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
- "context"
- "encoding/json"
- "sync"
- )
- func PrepareTimeWindowProducerQueue() {
- var prepareTimeWindowProducerQueueMutex sync.Mutex
- ctx, cancel := context.WithCancel(context.Background())
- // 处理退出信号
- go func() {
- select {
- case signal := <-svc.ChannelKillWindowProducer:
- if signal == 1 {
- cancel()
- slaveConfig.TcpListener.Close()
- svc.AddKillTimes("3")
- return
- }
- }
- }()
- for {
- select {
- case <-ctx.Done():
- return
- default:
- conn, err := slaveConfig.TcpListener.Accept()
- if err != nil {
- select {
- case <-ctx.Done():
- return
- default:
- commonCfg.GlobalLogger.Error("接受连接错误:", err)
- continue
- }
- }
- prepareTimeWindowProducerQueueMutex.Lock()
- buffer := make([]byte, 2048)
- total, err := conn.Read(buffer)
- if err != nil {
- commonCfg.GlobalLogger.Error("读取数据错误:", err)
- continue
- }
- var timeWindow ent.TimeWindow
- err = json.Unmarshal(buffer[:total], &timeWindow)
- if err != nil {
- commonCfg.GlobalLogger.Error("解析Json时出错:", err)
- continue
- }
- util.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
- prepareTimeWindowProducerQueueMutex.Unlock()
- }
- }
- }
|