time_window.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package entity
  2. import (
  3. "encoding/json"
  4. "sync"
  5. "time"
  6. )
  7. var (
  8. TimeWindowProducerQueue []TimeWindow
  9. TimeWindowProducerQueueMutex sync.RWMutex
  10. TimeWindowProducerChannel = make(chan TimeWindow)
  11. TimeWindowConsumerQueue []TimeWindow
  12. TimeWindowConsumerQueueMutex sync.RWMutex
  13. TimeWindowConsumerChannel = make(chan TimeWindow)
  14. TcpSendTime = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
  15. TcpSendTimeMutex sync.Mutex
  16. ProcessingFlag = false // 是否有数据正在被处理
  17. )
  18. type TimeWindow struct {
  19. FaultTime string `json:"FaultTime"`
  20. TimeWindowBegin string `json:"TimeWindowBegin"`
  21. TimeWindowEnd string `json:"TimeWindowEnd"`
  22. Labels []string `json:"Labels"`
  23. TriggerIds []string `json:"TriggerIds"`
  24. Length int `json:"Length"`
  25. CanUpload string `json:"CanUpload"`
  26. MasterTopics []string `json:"MasterTopics"`
  27. SlaveTopics []string `json:"SlaveTopics"`
  28. }
  29. func RefreshTcpSendTime() {
  30. TcpSendTimeMutex.Lock()
  31. TcpSendTime = time.Now()
  32. TcpSendTimeMutex.Unlock()
  33. }
  34. func AddTimeWindowToTimeWindowProducerQueue(window TimeWindow) {
  35. TimeWindowProducerQueueMutex.RLock()
  36. {
  37. TimeWindowProducerQueue = append(TimeWindowProducerQueue, window)
  38. }
  39. TimeWindowProducerQueueMutex.RUnlock()
  40. }
  41. func AddTimeWindowToTimeWindowConsumerQueue(window TimeWindow) {
  42. TimeWindowConsumerQueueMutex.RLock()
  43. {
  44. TimeWindowConsumerQueue = append(TimeWindowConsumerQueue, window)
  45. }
  46. TimeWindowConsumerQueueMutex.RUnlock()
  47. }
  48. func RemoveHeadOfTimeWindowProducerQueue() {
  49. TimeWindowProducerQueueMutex.RLock()
  50. {
  51. TimeWindowProducerQueue = TimeWindowProducerQueue[1:]
  52. }
  53. TimeWindowProducerQueueMutex.RUnlock()
  54. }
  55. func RemoveHeadOfTimeWindowConsumerQueue() {
  56. TimeWindowConsumerQueueMutex.RLock()
  57. {
  58. TimeWindowConsumerQueue = TimeWindowConsumerQueue[1:]
  59. }
  60. TimeWindowConsumerQueueMutex.RUnlock()
  61. }
  62. // GetLastTimeWindow 获取最后一个时间窗口
  63. func GetLastTimeWindow() *TimeWindow {
  64. var lastTimeWindow *TimeWindow // 获取最后一个时间窗口
  65. if len(TimeWindowProducerQueue) > 0 {
  66. lastTimeWindow = &TimeWindowProducerQueue[len(TimeWindowProducerQueue)-1]
  67. }
  68. return lastTimeWindow
  69. }
  70. func TimeWindowToJson(msg TimeWindow) (string, error) {
  71. jsonData, err := json.Marshal(msg)
  72. if err != nil {
  73. return "", err
  74. }
  75. return string(jsonData), nil
  76. }