time_window.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package entity
  2. import (
  3. "encoding/json"
  4. "sync"
  5. "time"
  6. )
  7. var (
  8. TimeWindowProducerQueue []TimeWindow
  9. TimeWindowProducerQueueMutex sync.RWMutex
  10. TimeWindowProducerChannel = make(chan TimeWindow)
  11. TimeWindowConsumerQueue []TimeWindow
  12. TimeWindowConsumerQueueMutex sync.RWMutex
  13. TimeWindowConsumerChannel = make(chan TimeWindow)
  14. TcpSendTime = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  15. TcpSendTimeMutex sync.Mutex
  16. )
  17. type TimeWindow struct {
  18. FaultTime string `json:"FaultTime"`
  19. TimeWindowBegin string `json:"TimeWindowBegin"`
  20. TimeWindowEnd string `json:"TimeWindowEnd"`
  21. Labels []string `json:"Labels"`
  22. TriggerIds []string `json:"TriggerIds"`
  23. Length int `json:"Length"`
  24. CanUpload string `json:"CanUpload"`
  25. MasterTopics []string `json:"MasterTopics"`
  26. SlaveTopics []string `json:"SlaveTopics"`
  27. }
  28. func RefreshTcpSendTime() {
  29. TcpSendTimeMutex.Lock()
  30. TcpSendTime = time.Now()
  31. TcpSendTimeMutex.Unlock()
  32. }
  33. func AddTimeWindowToTimeWindowProducerQueue(window TimeWindow) {
  34. TimeWindowProducerQueueMutex.RLock()
  35. {
  36. TimeWindowProducerQueue = append(TimeWindowProducerQueue, window)
  37. }
  38. TimeWindowProducerQueueMutex.RUnlock()
  39. }
  40. func AddTimeWindowToTimeWindowConsumerQueue(window TimeWindow) {
  41. TimeWindowConsumerQueueMutex.RLock()
  42. {
  43. TimeWindowConsumerQueue = append(TimeWindowConsumerQueue, window)
  44. }
  45. TimeWindowConsumerQueueMutex.RUnlock()
  46. }
  47. func RemoveHeadOfTimeWindowProducerQueue() {
  48. TimeWindowProducerQueueMutex.RLock()
  49. {
  50. TimeWindowProducerQueue = TimeWindowProducerQueue[1:]
  51. }
  52. TimeWindowProducerQueueMutex.RUnlock()
  53. }
  54. func RemoveHeadOfTimeWindowConsumerQueue() {
  55. TimeWindowConsumerQueueMutex.RLock()
  56. {
  57. TimeWindowConsumerQueue = TimeWindowConsumerQueue[1:]
  58. }
  59. TimeWindowConsumerQueueMutex.RUnlock()
  60. }
  61. // GetLastTimeWindow 获取最后一个时间窗口
  62. func GetLastTimeWindow() *TimeWindow {
  63. var lastTimeWindow *TimeWindow // 获取最后一个时间窗口
  64. if len(TimeWindowProducerQueue) > 0 {
  65. lastTimeWindow = &TimeWindowProducerQueue[len(TimeWindowProducerQueue)-1]
  66. }
  67. return lastTimeWindow
  68. }
  69. func TimeWindowToJson(msg TimeWindow) (string, error) {
  70. jsonData, err := json.Marshal(msg)
  71. if err != nil {
  72. return "", err
  73. }
  74. return string(jsonData), nil
  75. }