accept_window.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. package svc
  2. import (
  3. "cicv-data-closedloop/kinglong/common/ent"
  4. commonCfg "cicv-data-closedloop/kinglong/common/log"
  5. "cicv-data-closedloop/kinglong/common/util"
  6. slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
  7. "encoding/json"
  8. "sync"
  9. )
  10. var (
  11. prepareTimeWindowProducerQueueMutex sync.Mutex
  12. )
  13. func PrepareTimeWindowProducerQueue() {
  14. for {
  15. // 等待新连接
  16. conn, err := slaveConfig.TcpListener.Accept()
  17. if err != nil {
  18. commonCfg.GlobalLogger.Error("接受连接错误:", err)
  19. continue
  20. }
  21. prepareTimeWindowProducerQueueMutex.Lock()
  22. {
  23. // 接收数据
  24. buffer := make([]byte, 2048)
  25. total, err := conn.Read(buffer)
  26. if err != nil {
  27. commonCfg.GlobalLogger.Error("读取数据错误:", err)
  28. continue
  29. }
  30. // 将JSON转换为结构体
  31. var timeWindow ent.TimeWindow
  32. err = json.Unmarshal(buffer[:total], &timeWindow)
  33. if err != nil {
  34. commonCfg.GlobalLogger.Error("解析Json时出错:", err)
  35. continue
  36. }
  37. //commonCfg.GlobalLogger.Info("接收到时间窗口:",timeWindow)
  38. util.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
  39. }
  40. prepareTimeWindowProducerQueueMutex.Unlock()
  41. }
  42. }