produce_window.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package svc
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/package/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "github.com/bluenviron/goroslib/v2"
  11. "sync"
  12. "time"
  13. )
  14. var (
  15. m sync.RWMutex
  16. velocityX float64
  17. velocityY float64
  18. yaw float64
  19. )
  20. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  21. func PrepareTimeWindowProducerQueue() {
  22. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  23. c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfCicvLocation)
  24. var err error
  25. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  26. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  27. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  28. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  29. for i, topic := range commonConfig.SubscribeTopics {
  30. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  31. if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
  32. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  33. Node: commonConfig.RosNode,
  34. Topic: topic,
  35. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  36. // 更新共享变量
  37. m.RLock()
  38. velocityX = data.VelocityX
  39. velocityY = data.VelocityY
  40. yaw = data.Yaw
  41. m.RUnlock()
  42. if len(masterConfig.TopicOfCicvLocation) == 0 {
  43. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  44. return
  45. }
  46. subscribersTimeMutexes[i].Lock()
  47. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  48. subscribersMutexes[i].Lock()
  49. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  50. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  51. var faultLabel string
  52. for _, f := range masterConfig.RuleOfCicvLocation {
  53. faultLabel = f(data)
  54. if faultLabel != "" {
  55. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  56. break
  57. }
  58. }
  59. subscribersMutexes[i].Unlock()
  60. }
  61. subscribersTimeMutexes[i].Unlock()
  62. },
  63. })
  64. }
  65. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  66. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  67. Node: commonConfig.RosNode,
  68. Topic: topic,
  69. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  70. if len(masterConfig.TopicOfTpperception) == 0 {
  71. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  72. return
  73. }
  74. subscribersTimeMutexes[i].Lock()
  75. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  76. subscribersMutexes[i].Lock()
  77. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  78. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  79. var faultLabel string
  80. for _, f := range masterConfig.RuleOfTpperception {
  81. faultLabel = f(data, velocityX, velocityY, yaw)
  82. if faultLabel != "" {
  83. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  84. break
  85. }
  86. }
  87. subscribersMutexes[i].Unlock()
  88. }
  89. subscribersTimeMutexes[i].Unlock()
  90. },
  91. })
  92. }
  93. if topic == masterConfig.TopicOfDataRead && len(masterConfig.RuleOfDataRead) > 0 {
  94. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  95. Node: commonConfig.RosNode,
  96. Topic: topic,
  97. Callback: func(data *pjisuv_msgs.Retrieval) {
  98. if len(masterConfig.TopicOfDataRead) == 0 {
  99. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  100. return
  101. }
  102. subscribersTimeMutexes[i].Lock()
  103. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  104. subscribersMutexes[i].Lock()
  105. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  106. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  107. var faultLabel string
  108. for _, f := range masterConfig.RuleOfDataRead {
  109. faultLabel = f(data)
  110. if faultLabel != "" {
  111. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  112. subscribersTimes[i] = time.Now()
  113. break
  114. }
  115. }
  116. subscribersMutexes[i].Unlock()
  117. }
  118. subscribersTimeMutexes[i].Unlock()
  119. },
  120. })
  121. }
  122. if err != nil {
  123. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  124. //TODO 如何回传日志
  125. continue
  126. }
  127. }
  128. select {
  129. case signal := <-service.ChannelKillWindowProducer:
  130. if signal == 1 {
  131. commonConfig.RosNode.Close()
  132. service.AddKillTimes("3")
  133. return
  134. }
  135. }
  136. }
  137. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *entity.TimeWindow) {
  138. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  139. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  140. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  141. newTimeWindow := entity.TimeWindow{
  142. FaultTime: faultHappenTime,
  143. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  144. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  145. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  146. Labels: []string{faultLabel},
  147. MasterTopics: masterTopics,
  148. SlaveTopics: slaveTopics,
  149. }
  150. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  151. entity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  152. } else {
  153. // 2-2 如果在旧故障窗口内
  154. entity.TimeWindowProducerQueueMutex.RLock()
  155. defer entity.TimeWindowProducerQueueMutex.RUnlock()
  156. // 2-2-1 更新故障窗口end时间
  157. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  158. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  159. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  160. lastTimeWindow.TimeWindowEnd = maxEnd
  161. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  162. } else {
  163. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  164. lastTimeWindow.TimeWindowEnd = expectEnd
  165. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  166. }
  167. }
  168. // 2-2-2 更新label
  169. labels := lastTimeWindow.Labels
  170. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  171. // 2-2-3 更新 topic
  172. sourceMasterTopics := lastTimeWindow.MasterTopics
  173. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  174. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  175. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  176. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  177. }
  178. }
  179. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  180. // 获取所有需要采集的topic
  181. var faultCodeTopics []string
  182. for _, code := range commonConfig.CloudConfig.Triggers {
  183. if code.Label == faultLabel {
  184. faultCodeTopics = code.Topics
  185. }
  186. }
  187. // 根据不同节点采集的topic进行分配采集
  188. for _, acceptTopic := range faultCodeTopics {
  189. for _, host := range commonConfig.CloudConfig.Hosts {
  190. for _, topic := range host.Topics {
  191. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  192. masterTopics = append(masterTopics, acceptTopic)
  193. }
  194. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  195. slaveTopics = append(slaveTopics, acceptTopic)
  196. }
  197. }
  198. }
  199. }
  200. return masterTopics, slaveTopics
  201. }