produce_window.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pji/common/config"
  4. commonService "cicv-data-closedloop/aarch64/pji/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pji/master/package/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. "cicv-data-closedloop/common/entity"
  8. commonUtil "cicv-data-closedloop/common/util"
  9. "github.com/bluenviron/goroslib/v2"
  10. "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
  11. "sync"
  12. "time"
  13. )
  14. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  15. func PrepareTimeWindowProducerQueue() {
  16. var err error
  17. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  18. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  19. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  20. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  21. for i, topic := range commonConfig.SubscribeTopics {
  22. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  23. if topic == masterConfig.TopicOfObstacleDetection {
  24. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  25. Node: commonConfig.RosNode,
  26. Topic: topic,
  27. Callback: func(data *std_msgs.UInt8) {
  28. if len(masterConfig.RuleOfObstacleDetection) == 0 {
  29. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  30. return
  31. }
  32. subscribersTimeMutexes[i].Lock()
  33. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  34. subscribersMutexes[i].Lock()
  35. faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
  36. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  37. var faultLabel string
  38. for _, f := range masterConfig.RuleOfObstacleDetection {
  39. faultLabel = f(data)
  40. if faultLabel != "" {
  41. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  42. break
  43. }
  44. }
  45. subscribersMutexes[i].Unlock()
  46. }
  47. subscribersTimeMutexes[i].Unlock()
  48. },
  49. })
  50. }
  51. if err != nil {
  52. c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  53. //TODO 如何回传日志
  54. continue
  55. }
  56. }
  57. select {
  58. case signal := <-commonService.ChannelKillSubscriber:
  59. if signal == 1 {
  60. commonConfig.RosNode.Close()
  61. commonService.AddKillTimes("3")
  62. return
  63. }
  64. }
  65. }
  66. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *entity.TimeWindow) {
  67. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  68. if lastTimeWindow == nil || commonUtil.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  69. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  70. newTimeWindow := entity.TimeWindow{
  71. FaultTime: faultHappenTime,
  72. TimeWindowBegin: commonUtil.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  73. TimeWindowEnd: commonUtil.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  74. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  75. Labels: []string{faultLabel},
  76. MasterTopics: masterTopics,
  77. SlaveTopics: slaveTopics,
  78. }
  79. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  80. entity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  81. } else {
  82. // 2-2 如果在旧故障窗口内
  83. entity.TimeWindowProducerQueueMutex.RLock()
  84. defer entity.TimeWindowProducerQueueMutex.RUnlock()
  85. // 2-2-1 更新故障窗口end时间
  86. maxEnd := commonUtil.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  87. expectEnd := commonUtil.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  88. if commonUtil.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  89. lastTimeWindow.TimeWindowEnd = maxEnd
  90. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  91. } else {
  92. if commonUtil.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  93. lastTimeWindow.TimeWindowEnd = expectEnd
  94. lastTimeWindow.Length = commonUtil.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  95. }
  96. }
  97. // 2-2-2 更新label
  98. labels := lastTimeWindow.Labels
  99. lastTimeWindow.Labels = commonUtil.AppendIfNotExists(labels, faultLabel)
  100. // 2-2-3 更新 topic
  101. sourceMasterTopics := lastTimeWindow.MasterTopics
  102. lastTimeWindow.MasterTopics = commonUtil.MergeSlice(sourceMasterTopics, masterTopics)
  103. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  104. lastTimeWindow.SlaveTopics = commonUtil.MergeSlice(sourceSlaveTopics, slaveTopics)
  105. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  106. }
  107. }
  108. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  109. // 获取所有需要采集的topic
  110. var faultCodeTopics []string
  111. for _, code := range commonConfig.CloudConfig.Triggers {
  112. if code.Label == faultLabel {
  113. faultCodeTopics = code.Topics
  114. }
  115. }
  116. return faultCodeTopics, nil
  117. }