produce_window.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package svc
  2. import (
  3. "cicv-data-closedloop/common/config/c_log"
  4. commonUtil "cicv-data-closedloop/common/util"
  5. commonConfig "cicv-data-closedloop/pji/common/cfg"
  6. commonEntity "cicv-data-closedloop/pji/common/ent"
  7. "cicv-data-closedloop/pji/common/global"
  8. commonService "cicv-data-closedloop/pji/common/svc"
  9. "cicv-data-closedloop/pji/common/util"
  10. masterConfig "cicv-data-closedloop/pji/master/package/cfg"
  11. "github.com/bluenviron/goroslib/v2"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
  13. "sync"
  14. "time"
  15. )
  16. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  17. func PrepareTimeWindowProducerQueue() {
  18. var err error
  19. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  20. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  21. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  22. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  23. for i, topic := range commonConfig.SubscribeTopics {
  24. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  25. if topic == masterConfig.TopicOfObstacleDetection {
  26. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  27. Node: commonConfig.RosNode,
  28. Topic: topic,
  29. Callback: func(data *std_msgs.UInt8) {
  30. if len(masterConfig.RuleOfObstacleDetection) == 0 {
  31. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  32. return
  33. }
  34. subscribersTimeMutexes[i].Lock()
  35. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  36. subscribersMutexes[i].Lock()
  37. faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
  38. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  39. var faultLabel string
  40. for _, f := range masterConfig.RuleOfObstacleDetection {
  41. faultLabel = f(data)
  42. if faultLabel != "" {
  43. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  44. break
  45. }
  46. }
  47. subscribersMutexes[i].Unlock()
  48. }
  49. subscribersTimeMutexes[i].Unlock()
  50. },
  51. })
  52. }
  53. if err != nil {
  54. c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  55. //TODO 如何回传日志
  56. continue
  57. }
  58. }
  59. select {
  60. case signal := <-commonService.ChannelKillSubscriber:
  61. if signal == 1 {
  62. commonConfig.RosNode.Close()
  63. commonService.AddKillTimes("3")
  64. return
  65. }
  66. }
  67. }
  68. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  69. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  70. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  71. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  72. newTimeWindow := commonEntity.TimeWindow{
  73. FaultTime: faultHappenTime,
  74. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  75. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  76. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  77. Labels: []string{faultLabel},
  78. MasterTopics: masterTopics,
  79. SlaveTopics: slaveTopics,
  80. }
  81. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  82. util.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  83. } else {
  84. // 2-2 如果在旧故障窗口内
  85. global.TimeWindowProducerQueueMutex.RLock()
  86. defer global.TimeWindowProducerQueueMutex.RUnlock()
  87. // 2-2-1 更新故障窗口end时间
  88. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  89. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  90. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  91. lastTimeWindow.TimeWindowEnd = maxEnd
  92. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  93. } else {
  94. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  95. lastTimeWindow.TimeWindowEnd = expectEnd
  96. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  97. }
  98. }
  99. // 2-2-2 更新label
  100. labels := lastTimeWindow.Labels
  101. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  102. // 2-2-3 更新 topic
  103. sourceMasterTopics := lastTimeWindow.MasterTopics
  104. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  105. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  106. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  107. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  108. }
  109. }
  110. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  111. // 获取所有需要采集的topic
  112. var faultCodeTopics []string
  113. for _, code := range commonConfig.CloudConfig.Triggers {
  114. if code.Label == faultLabel {
  115. faultCodeTopics = code.Topics
  116. }
  117. }
  118. return faultCodeTopics, nil
  119. }
  120. func initCallbackFunc() {
  121. }