accept_window.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package service
  2. import (
  3. "cicv-data-closedloop/aarch64/jili/common/service"
  4. "cicv-data-closedloop/aarch64/jili/slave/package/config"
  5. "cicv-data-closedloop/common/config/c_log"
  6. "cicv-data-closedloop/common/entity"
  7. "context"
  8. "encoding/json"
  9. "sync"
  10. )
  11. func PrepareTimeWindowProducerQueue() {
  12. var prepareTimeWindowProducerQueueMutex sync.Mutex
  13. ctx, cancel := context.WithCancel(context.Background())
  14. // 处理退出信号
  15. go func() {
  16. select {
  17. case signal := <-service.ChannelKillWindowProducer:
  18. if signal == 1 {
  19. cancel()
  20. config.TcpListener.Close()
  21. service.AddKillTimes("3")
  22. return
  23. }
  24. }
  25. }()
  26. for {
  27. select {
  28. case <-ctx.Done():
  29. return
  30. default:
  31. conn, err := config.TcpListener.Accept()
  32. if err != nil {
  33. select {
  34. case <-ctx.Done():
  35. return
  36. default:
  37. c_log.GlobalLogger.Error("接受连接错误:", err)
  38. continue
  39. }
  40. }
  41. prepareTimeWindowProducerQueueMutex.Lock()
  42. buffer := make([]byte, 2048)
  43. total, err := conn.Read(buffer)
  44. if err != nil {
  45. c_log.GlobalLogger.Error("读取数据错误:", err)
  46. continue
  47. }
  48. var timeWindow entity.TimeWindow
  49. err = json.Unmarshal(buffer[:total], &timeWindow)
  50. if err != nil {
  51. c_log.GlobalLogger.Error("解析Json时出错:", err)
  52. continue
  53. }
  54. entity.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
  55. prepareTimeWindowProducerQueueMutex.Unlock()
  56. }
  57. }
  58. }