produce_window.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pji/common/config"
  4. commonService "cicv-data-closedloop/aarch64/pji/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pji/master/package/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. "cicv-data-closedloop/common/entity"
  8. commonUtil "cicv-data-closedloop/common/util"
  9. "github.com/bluenviron/goroslib/v2"
  10. "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
  11. "sync"
  12. "time"
  13. )
  14. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  15. func PrepareTimeWindowProducerQueue() {
  16. var err error
  17. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  18. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  19. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  20. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  21. for i, topic := range commonConfig.SubscribeTopics {
  22. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  23. if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
  24. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  25. Node: commonConfig.RosNode,
  26. Topic: topic,
  27. Callback: func(data *std_msgs.UInt8) {
  28. subscribersTimeMutexes[i].Lock()
  29. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  30. subscribersMutexes[i].Lock()
  31. faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
  32. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  33. var faultLabel string
  34. for _, f := range masterConfig.RuleOfObstacleDetection {
  35. faultLabel = f(data)
  36. if faultLabel != "" {
  37. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  38. subscribersTimes[i] = time.Now()
  39. break
  40. }
  41. }
  42. subscribersMutexes[i].Unlock()
  43. }
  44. subscribersTimeMutexes[i].Unlock()
  45. },
  46. })
  47. }
  48. if err != nil {
  49. c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  50. //TODO 如何回传日志
  51. continue
  52. }
  53. }
  54. select {
  55. case signal := <-commonService.ChannelKillSubscriber:
  56. if signal == 1 {
  57. commonConfig.RosNode.Close()
  58. commonService.AddKillTimes("3")
  59. return
  60. }
  61. }
  62. }
  63. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *entity.TimeWindow) {
  64. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  65. if lastTimeWindow == nil || commonUtil.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  66. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  67. newTimeWindow := entity.TimeWindow{
  68. FaultTime: faultHappenTime,
  69. TimeWindowBegin: commonUtil.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  70. TimeWindowEnd: commonUtil.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  71. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  72. Labels: []string{faultLabel},
  73. MasterTopics: masterTopics,
  74. SlaveTopics: slaveTopics,
  75. }
  76. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  77. entity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  78. } else {
  79. // 2-2 如果在旧故障窗口内
  80. entity.TimeWindowProducerQueueMutex.RLock()
  81. defer entity.TimeWindowProducerQueueMutex.RUnlock()
  82. // 2-2-1 更新故障窗口end时间
  83. maxEnd := commonUtil.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  84. expectEnd := commonUtil.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  85. if commonUtil.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  86. lastTimeWindow.TimeWindowEnd = maxEnd
  87. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  88. } else {
  89. if commonUtil.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  90. lastTimeWindow.TimeWindowEnd = expectEnd
  91. lastTimeWindow.Length = commonUtil.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  92. }
  93. }
  94. // 2-2-2 更新label
  95. labels := lastTimeWindow.Labels
  96. lastTimeWindow.Labels = commonUtil.AppendIfNotExists(labels, faultLabel)
  97. // 2-2-3 更新 topic
  98. sourceMasterTopics := lastTimeWindow.MasterTopics
  99. lastTimeWindow.MasterTopics = commonUtil.MergeSlice(sourceMasterTopics, masterTopics)
  100. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  101. lastTimeWindow.SlaveTopics = commonUtil.MergeSlice(sourceSlaveTopics, slaveTopics)
  102. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  103. }
  104. }
  105. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  106. // 获取所有需要采集的topic
  107. var faultCodeTopics []string
  108. for _, code := range commonConfig.CloudConfig.Triggers {
  109. if code.Label == faultLabel {
  110. faultCodeTopics = code.Topics
  111. }
  112. }
  113. return faultCodeTopics, nil
  114. }