time_window.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package entity
  2. import (
  3. "encoding/json"
  4. "sync"
  5. "time"
  6. )
  7. var (
  8. TimeWindowProducerQueue []TimeWindow
  9. TimeWindowProducerQueueMutex sync.RWMutex
  10. TimeWindowConsumerQueue []TimeWindow
  11. TimeWindowConsumerQueueMutex sync.RWMutex
  12. Subscriber0Time = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  13. Subscriber0TimeMutex sync.Mutex
  14. Subscriber1Time = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  15. Subscriber1TimeMutex sync.Mutex
  16. Subscriber2Time = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  17. Subscriber2TimeMutex sync.Mutex
  18. Subscriber3Time = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  19. Subscriber3TimeMutex sync.Mutex
  20. Subscriber4Time = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  21. Subscriber4TimeMutex sync.Mutex
  22. Subscriber5Time = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  23. Subscriber5TimeMutex sync.Mutex
  24. TcpSendTime = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  25. TcpSendTimeMutex sync.Mutex
  26. )
  27. type TimeWindow struct {
  28. FaultTime string `json:"FaultTime"`
  29. TimeWindowBegin string `json:"TimeWindowBegin"`
  30. TimeWindowEnd string `json:"TimeWindowEnd"`
  31. Labels []string `json:"Labels"`
  32. TriggerIds []string `json:"TriggerIds"`
  33. Length int `json:"Length"`
  34. CanUpload string `json:"CanUpload"`
  35. MasterTopics []string `json:"MasterTopics"`
  36. SlaveTopics []string `json:"SlaveTopics"`
  37. }
  38. func RefreshTcpSendTime() {
  39. TcpSendTimeMutex.Lock()
  40. TcpSendTime = time.Now()
  41. TcpSendTimeMutex.Unlock()
  42. }
  43. func AddTimeWindowToTimeWindowProducerQueue(window TimeWindow) {
  44. TimeWindowProducerQueueMutex.RLock()
  45. {
  46. TimeWindowProducerQueue = append(TimeWindowProducerQueue, window)
  47. }
  48. TimeWindowProducerQueueMutex.RUnlock()
  49. }
  50. func AddTimeWindowToTimeWindowConsumerQueue(window TimeWindow) {
  51. TimeWindowConsumerQueueMutex.RLock()
  52. {
  53. TimeWindowConsumerQueue = append(TimeWindowConsumerQueue, window)
  54. }
  55. TimeWindowConsumerQueueMutex.RUnlock()
  56. }
  57. func RemoveHeadOfTimeWindowProducerQueue() {
  58. TimeWindowProducerQueueMutex.RLock()
  59. {
  60. TimeWindowProducerQueue = TimeWindowProducerQueue[1:]
  61. }
  62. TimeWindowProducerQueueMutex.RUnlock()
  63. }
  64. func RemoveHeadOfTimeWindowConsumerQueue() {
  65. TimeWindowConsumerQueueMutex.RLock()
  66. {
  67. TimeWindowConsumerQueue = TimeWindowConsumerQueue[1:]
  68. }
  69. TimeWindowConsumerQueueMutex.RUnlock()
  70. }
  71. // GetLastTimeWindow 获取最后一个时间窗口
  72. func GetLastTimeWindow() *TimeWindow {
  73. var lastTimeWindow *TimeWindow // 获取最后一个时间窗口
  74. if len(TimeWindowProducerQueue) > 0 {
  75. lastTimeWindow = &TimeWindowProducerQueue[len(TimeWindowProducerQueue)-1]
  76. }
  77. return lastTimeWindow
  78. }
  79. func TimeWindowToJson(msg TimeWindow) (string, error) {
  80. jsonData, err := json.Marshal(msg)
  81. if err != nil {
  82. return "", err
  83. }
  84. return string(jsonData), nil
  85. }