time_window.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package entity
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. var (
  7. TimeWindowProducerQueue []TimeWindow
  8. TimeWindowProducerQueueMutex sync.RWMutex
  9. TimeWindowConsumerQueue []TimeWindow
  10. TimeWindowConsumerQueueMutex sync.RWMutex
  11. Subscriber0Time = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  12. Subscriber0TimeMutex sync.Mutex
  13. Subscriber2Time = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  14. Subscriber2TimeMutex sync.Mutex
  15. Subscriber3Time = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  16. Subscriber3TimeMutex sync.Mutex
  17. TcpSendTime = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  18. TcpSendTimeMutex sync.Mutex
  19. )
  20. type TimeWindow struct {
  21. FaultTime string `json:"FaultTime"`
  22. TimeWindowBegin string `json:"TimeWindowBegin"`
  23. TimeWindowEnd string `json:"TimeWindowEnd"`
  24. Labels []string `json:"Labels"`
  25. TriggerIds []string `json:"TriggerIds"`
  26. Length int `json:"Length"`
  27. CanUpload string `json:"CanUpload"`
  28. MasterTopics []string `json:"MasterTopics"`
  29. SlaveTopics []string `json:"SlaveTopics"`
  30. }
  31. func RefreshTcpSendTime() {
  32. TcpSendTimeMutex.Lock()
  33. TcpSendTime = time.Now()
  34. TcpSendTimeMutex.Unlock()
  35. }
  36. func AddTimeWindowToTimeWindowProducerQueue(window TimeWindow) {
  37. TimeWindowProducerQueueMutex.RLock()
  38. {
  39. TimeWindowProducerQueue = append(TimeWindowProducerQueue, window)
  40. }
  41. TimeWindowProducerQueueMutex.RUnlock()
  42. }
  43. func AddTimeWindowToTimeWindowConsumerQueue(window TimeWindow) {
  44. TimeWindowConsumerQueueMutex.RLock()
  45. {
  46. TimeWindowConsumerQueue = append(TimeWindowConsumerQueue, window)
  47. }
  48. TimeWindowConsumerQueueMutex.RUnlock()
  49. }
  50. func RemoveHeadOfdTimeWindowProducerQueue() {
  51. TimeWindowProducerQueueMutex.RLock()
  52. {
  53. TimeWindowProducerQueue = TimeWindowProducerQueue[1:]
  54. }
  55. TimeWindowProducerQueueMutex.RUnlock()
  56. }
  57. func RemoveHeaOfdTimeWindowConsumerQueue() {
  58. TimeWindowConsumerQueueMutex.RLock()
  59. {
  60. TimeWindowConsumerQueue = TimeWindowConsumerQueue[1:]
  61. }
  62. TimeWindowConsumerQueueMutex.RUnlock()
  63. }
  64. // GetLastTimeWindow 获取最后一个时间窗口
  65. func GetLastTimeWindow() *TimeWindow {
  66. var lastTimeWindow *TimeWindow // 获取最后一个时间窗口
  67. if len(TimeWindowProducerQueue) > 0 {
  68. lastTimeWindow = &TimeWindowProducerQueue[len(TimeWindowProducerQueue)-1]
  69. }
  70. return lastTimeWindow
  71. }