time_window.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package entity
  2. import (
  3. "encoding/json"
  4. "sync"
  5. "time"
  6. )
  7. var (
  8. TimeWindowProducerQueue []TimeWindow
  9. TimeWindowProducerQueueMutex sync.RWMutex
  10. TimeWindowConsumerQueue []TimeWindow
  11. TimeWindowConsumerQueueMutex sync.RWMutex
  12. Subscriber1Time = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  13. Subscriber1TimeMutex sync.Mutex
  14. Subscriber2Time = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  15. Subscriber2TimeMutex sync.Mutex
  16. Subscriber4Time = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  17. Subscriber4TimeMutex sync.Mutex
  18. TcpSendTime = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  19. TcpSendTimeMutex sync.Mutex
  20. )
  21. type TimeWindow struct {
  22. FaultTime string `json:"FaultTime"`
  23. TimeWindowBegin string `json:"TimeWindowBegin"`
  24. TimeWindowEnd string `json:"TimeWindowEnd"`
  25. Labels []string `json:"Labels"`
  26. TriggerIds []string `json:"TriggerIds"`
  27. Length int `json:"Length"`
  28. CanUpload string `json:"CanUpload"`
  29. MasterTopics []string `json:"MasterTopics"`
  30. SlaveTopics []string `json:"SlaveTopics"`
  31. }
  32. func RefreshTcpSendTime() {
  33. TcpSendTimeMutex.Lock()
  34. TcpSendTime = time.Now()
  35. TcpSendTimeMutex.Unlock()
  36. }
  37. func AddTimeWindowToTimeWindowProducerQueue(window TimeWindow) {
  38. TimeWindowProducerQueueMutex.RLock()
  39. {
  40. TimeWindowProducerQueue = append(TimeWindowProducerQueue, window)
  41. }
  42. TimeWindowProducerQueueMutex.RUnlock()
  43. }
  44. func AddTimeWindowToTimeWindowConsumerQueue(window TimeWindow) {
  45. TimeWindowConsumerQueueMutex.RLock()
  46. {
  47. TimeWindowConsumerQueue = append(TimeWindowConsumerQueue, window)
  48. }
  49. TimeWindowConsumerQueueMutex.RUnlock()
  50. }
  51. func RemoveHeadOfdTimeWindowProducerQueue() {
  52. TimeWindowProducerQueueMutex.RLock()
  53. {
  54. TimeWindowProducerQueue = TimeWindowProducerQueue[1:]
  55. }
  56. TimeWindowProducerQueueMutex.RUnlock()
  57. }
  58. func RemoveHeaOfdTimeWindowConsumerQueue() {
  59. TimeWindowConsumerQueueMutex.RLock()
  60. {
  61. TimeWindowConsumerQueue = TimeWindowConsumerQueue[1:]
  62. }
  63. TimeWindowConsumerQueueMutex.RUnlock()
  64. }
  65. // GetLastTimeWindow 获取最后一个时间窗口
  66. func GetLastTimeWindow() *TimeWindow {
  67. var lastTimeWindow *TimeWindow // 获取最后一个时间窗口
  68. if len(TimeWindowProducerQueue) > 0 {
  69. lastTimeWindow = &TimeWindowProducerQueue[len(TimeWindowProducerQueue)-1]
  70. }
  71. return lastTimeWindow
  72. }
  73. func TimeWindowToJson(msg TimeWindow) (string, error) {
  74. jsonData, err := json.Marshal(msg)
  75. if err != nil {
  76. return "", err
  77. }
  78. return string(jsonData), nil
  79. }