accept_window.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package svc
  2. import (
  3. "cicv-data-closedloop/kinglong/common/ent"
  4. "cicv-data-closedloop/kinglong/common/global"
  5. commonCfg "cicv-data-closedloop/kinglong/common/log"
  6. "cicv-data-closedloop/kinglong/common/svc"
  7. "cicv-data-closedloop/kinglong/common/util"
  8. slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
  9. "encoding/json"
  10. "sync"
  11. )
  12. var (
  13. prepareTimeWindowProducerQueueMutex sync.Mutex
  14. )
  15. func PrepareTimeWindowProducerQueue() {
  16. for {
  17. select {
  18. case signal := <-svc.ChannelKillWindowProducer:
  19. if signal == 1 {
  20. if len(global.TimeWindowConsumerQueue) == 0 {
  21. svc.AddKillTimes("3")
  22. return
  23. }
  24. }
  25. default:
  26. }
  27. // 等待新连接
  28. conn, err := slaveConfig.TcpListener.Accept()
  29. if err != nil {
  30. commonCfg.GlobalLogger.Error("接受连接错误:", err)
  31. continue
  32. }
  33. prepareTimeWindowProducerQueueMutex.Lock()
  34. // 接收数据
  35. buffer := make([]byte, 2048)
  36. total, err := conn.Read(buffer)
  37. if err != nil {
  38. commonCfg.GlobalLogger.Error("读取数据错误:", err)
  39. continue
  40. }
  41. // 将JSON转换为结构体
  42. var timeWindow ent.TimeWindow
  43. err = json.Unmarshal(buffer[:total], &timeWindow)
  44. if err != nil {
  45. commonCfg.GlobalLogger.Error("解析Json时出错:", err)
  46. continue
  47. }
  48. //commonCfg.GlobalLogger.Info("接收到时间窗口:",timeWindow)
  49. util.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
  50. prepareTimeWindowProducerQueueMutex.Unlock()
  51. }
  52. }