accept_window.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package svc
  2. import (
  3. "cicv-data-closedloop/kinglong/common/ent"
  4. commonCfg "cicv-data-closedloop/kinglong/common/log"
  5. "cicv-data-closedloop/kinglong/common/svc"
  6. "cicv-data-closedloop/kinglong/common/util"
  7. slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
  8. "context"
  9. "encoding/json"
  10. "sync"
  11. )
  12. func PrepareTimeWindowProducerQueue() {
  13. var prepareTimeWindowProducerQueueMutex sync.Mutex
  14. ctx, cancel := context.WithCancel(context.Background())
  15. // 处理退出信号
  16. go func() {
  17. select {
  18. case signal := <-svc.ChannelKillWindowProducer:
  19. if signal == 1 {
  20. cancel()
  21. slaveConfig.TcpListener.Close()
  22. svc.AddKillTimes("3")
  23. return
  24. }
  25. }
  26. }()
  27. for {
  28. select {
  29. case <-ctx.Done():
  30. return
  31. default:
  32. conn, err := slaveConfig.TcpListener.Accept()
  33. if err != nil {
  34. select {
  35. case <-ctx.Done():
  36. return
  37. default:
  38. commonCfg.GlobalLogger.Error("接受连接错误:", err)
  39. continue
  40. }
  41. }
  42. prepareTimeWindowProducerQueueMutex.Lock()
  43. buffer := make([]byte, 2048)
  44. total, err := conn.Read(buffer)
  45. if err != nil {
  46. commonCfg.GlobalLogger.Error("读取数据错误:", err)
  47. continue
  48. }
  49. var timeWindow ent.TimeWindow
  50. err = json.Unmarshal(buffer[:total], &timeWindow)
  51. if err != nil {
  52. commonCfg.GlobalLogger.Error("解析Json时出错:", err)
  53. continue
  54. }
  55. util.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
  56. prepareTimeWindowProducerQueueMutex.Unlock()
  57. }
  58. }
  59. }