accept_window.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package svc
  2. import (
  3. "cicv-data-closedloop/kinglong/common/ent"
  4. "cicv-data-closedloop/kinglong/common/global"
  5. commonCfg "cicv-data-closedloop/kinglong/common/log"
  6. "cicv-data-closedloop/kinglong/common/svc"
  7. "cicv-data-closedloop/kinglong/common/util"
  8. slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
  9. "encoding/json"
  10. "sync"
  11. )
  12. var (
  13. prepareTimeWindowProducerQueueMutex sync.Mutex
  14. )
  15. func PrepareTimeWindowProducerQueue() {
  16. for {
  17. select {
  18. case signal := <-svc.ChannelKillWindowProducer:
  19. if signal == 1 {
  20. if len(global.TimeWindowConsumerQueue) == 0 {
  21. svc.AddKillTimes("3")
  22. return
  23. }
  24. }
  25. default:
  26. }
  27. // 等待新连接
  28. conn, err := slaveConfig.TcpListener.Accept()
  29. if err != nil {
  30. commonCfg.GlobalLogger.Error("接受连接错误:", err)
  31. continue
  32. }
  33. prepareTimeWindowProducerQueueMutex.Lock()
  34. {
  35. // 接收数据
  36. buffer := make([]byte, 2048)
  37. total, err := conn.Read(buffer)
  38. if err != nil {
  39. commonCfg.GlobalLogger.Error("读取数据错误:", err)
  40. continue
  41. }
  42. // 将JSON转换为结构体
  43. var timeWindow ent.TimeWindow
  44. err = json.Unmarshal(buffer[:total], &timeWindow)
  45. if err != nil {
  46. commonCfg.GlobalLogger.Error("解析Json时出错:", err)
  47. continue
  48. }
  49. //commonCfg.GlobalLogger.Info("接收到时间窗口:",timeWindow)
  50. util.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
  51. }
  52. prepareTimeWindowProducerQueueMutex.Unlock()
  53. }
  54. }