produce_window.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package svc
  2. import (
  3. "cicv-data-closedloop/pji/common/cfg"
  4. commonEntity "cicv-data-closedloop/pji/common/ent"
  5. "cicv-data-closedloop/pji/common/global"
  6. "cicv-data-closedloop/pji/common/log"
  7. commonService "cicv-data-closedloop/pji/common/svc"
  8. "cicv-data-closedloop/pji/common/util"
  9. masterConfig "cicv-data-closedloop/pji/master/pkg/cfg"
  10. "cicv-data-closedloop/pji_msgs"
  11. "github.com/bluenviron/goroslib/v2"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/diagnostic_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
  15. "os"
  16. "sync"
  17. )
  18. var (
  19. subscriber0Mutex sync.Mutex
  20. subscriber1Mutex sync.Mutex
  21. subscriber2Mutex sync.Mutex
  22. subscriber3Mutex sync.Mutex
  23. subscriber4Mutex sync.Mutex
  24. subscriber5Mutex sync.Mutex
  25. )
  26. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  27. func PrepareTimeWindowProducerQueue() {
  28. //log.GlobalLogger.Info("创建订阅者订阅主题", masterConfig.TopicOfOdom)
  29. //subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  30. // Node: cfg.RosNode,
  31. // Topic: masterConfig.TopicOfOdom,
  32. // Callback: func(data *nav_msgs.Odometry) {
  33. // //log.GlobalLogger.Info("话题", masterConfig.TopicOfOdom, "接收到数据", data)
  34. // subscriber0Mutex.Lock()
  35. // {
  36. // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  37. // lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  38. // var faultLabel string
  39. // for _, f := range masterConfig.RuleOfOdom {
  40. // faultLabel = f(data)
  41. // if faultLabel != "" {
  42. // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  43. // break
  44. // }
  45. // }
  46. // }
  47. // subscriber0Mutex.Unlock()
  48. // },
  49. //})
  50. //if err != nil {
  51. // log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfOdom, "发生故障:", err)
  52. // os.Exit(-1)
  53. //}
  54. log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfObstacleDetection)
  55. subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  56. Node: cfg.RosNode,
  57. Topic: masterConfig.TopicOfObstacleDetection,
  58. Callback: func(data *std_msgs.UInt8) {
  59. //log.GlobalLogger.Info("话题", masterConfig.TopicOfObstacleDetection, "接收到数据", data)
  60. subscriber1Mutex.Lock()
  61. {
  62. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  63. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  64. var faultLabel string
  65. for _, f := range masterConfig.RuleOfObstacleDetection {
  66. faultLabel = f(data)
  67. if faultLabel != "" {
  68. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  69. break
  70. }
  71. }
  72. }
  73. subscriber1Mutex.Unlock()
  74. },
  75. })
  76. if err != nil {
  77. log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  78. os.Exit(-1)
  79. }
  80. log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfSysInfo)
  81. subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  82. Node: cfg.RosNode,
  83. Topic: masterConfig.TopicOfSysInfo,
  84. Callback: func(data *pji_msgs.SysInfo) {
  85. //log.GlobalLogger.Info("话题", masterConfig.TopicOfSysInfo, "接收到数据", data)
  86. subscriber2Mutex.Lock()
  87. {
  88. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  89. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  90. var faultLabel string
  91. for _, f := range masterConfig.RuleOfSysInfo {
  92. faultLabel = f(data)
  93. if faultLabel != "" {
  94. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  95. break
  96. }
  97. }
  98. }
  99. subscriber2Mutex.Unlock()
  100. },
  101. })
  102. if err != nil {
  103. log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  104. os.Exit(-1)
  105. }
  106. log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfLocateInfo)
  107. subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  108. Node: cfg.RosNode,
  109. Topic: masterConfig.TopicOfLocateInfo,
  110. Callback: func(data *pji_msgs.LocateInfo) {
  111. //log.GlobalLogger.Info("话题", masterConfig.TopicOfLocateInfo, "接收到数据", data)
  112. subscriber3Mutex.Lock()
  113. {
  114. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  115. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  116. var faultLabel string
  117. for _, f := range masterConfig.RuleOfLocateInfo {
  118. faultLabel = f(data)
  119. if faultLabel != "" {
  120. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  121. break
  122. }
  123. }
  124. }
  125. subscriber3Mutex.Unlock()
  126. },
  127. })
  128. if err != nil {
  129. log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  130. os.Exit(-1)
  131. }
  132. log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfImu)
  133. subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  134. Node: cfg.RosNode,
  135. Topic: masterConfig.TopicOfImu,
  136. Callback: func(data *sensor_msgs.Imu) {
  137. //log.GlobalLogger.Info("话题", masterConfig.TopicOfImu, "接收到数据", data)
  138. subscriber4Mutex.Lock()
  139. {
  140. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  141. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  142. var faultLabel string
  143. for _, f := range masterConfig.RuleOfImu {
  144. faultLabel = f(data)
  145. if faultLabel != "" {
  146. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  147. break
  148. }
  149. }
  150. }
  151. subscriber4Mutex.Unlock()
  152. },
  153. })
  154. if err != nil {
  155. log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  156. os.Exit(-1)
  157. }
  158. log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfDiagnostics)
  159. subscriber5, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  160. Node: cfg.RosNode,
  161. Topic: masterConfig.TopicOfDiagnostics,
  162. Callback: func(data *diagnostic_msgs.DiagnosticArray) {
  163. //log.GlobalLogger.Info("话题", masterConfig.TopicOfDiagnostics, "接收到数据", data)
  164. subscriber5Mutex.Lock()
  165. {
  166. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  167. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  168. var faultLabel string
  169. for _, f := range masterConfig.RuleOfDiagnostics {
  170. faultLabel = f(data)
  171. if faultLabel != "" {
  172. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  173. break
  174. }
  175. }
  176. }
  177. subscriber5Mutex.Unlock()
  178. },
  179. })
  180. if err != nil {
  181. log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  182. os.Exit(-1)
  183. }
  184. select {
  185. case signal := <-commonService.ChannelKillSubscriber:
  186. if signal == 1 {
  187. //subscriber0.Close()
  188. subscriber1.Close()
  189. subscriber2.Close()
  190. subscriber3.Close()
  191. subscriber4.Close()
  192. subscriber5.Close()
  193. cfg.RosNode.Close()
  194. commonService.AddKillTimes("3")
  195. return
  196. }
  197. }
  198. }
  199. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  200. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  201. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  202. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  203. newTimeWindow := commonEntity.TimeWindow{
  204. FaultTime: faultHappenTime,
  205. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -cfg.PlatformConfig.TaskBeforeTime),
  206. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, cfg.PlatformConfig.TaskAfterTime),
  207. Length: cfg.PlatformConfig.TaskBeforeTime + cfg.PlatformConfig.TaskAfterTime + 1,
  208. Labels: []string{faultLabel},
  209. MasterTopics: masterTopics,
  210. SlaveTopics: slaveTopics,
  211. }
  212. log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  213. util.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  214. } else {
  215. // 2-2 如果在旧故障窗口内
  216. global.TimeWindowProducerQueueMutex.RLock()
  217. defer global.TimeWindowProducerQueueMutex.RUnlock()
  218. // 2-2-1 更新故障窗口end时间
  219. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, cfg.PlatformConfig.TaskMaxTime)
  220. expectEnd := util.TimeCustomChange(faultHappenTime, cfg.PlatformConfig.TaskAfterTime)
  221. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  222. lastTimeWindow.TimeWindowEnd = maxEnd
  223. lastTimeWindow.Length = cfg.PlatformConfig.TaskMaxTime
  224. } else {
  225. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  226. lastTimeWindow.TimeWindowEnd = expectEnd
  227. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  228. }
  229. }
  230. // 2-2-2 更新label
  231. labels := lastTimeWindow.Labels
  232. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  233. // 2-2-3 更新 topic
  234. sourceMasterTopics := lastTimeWindow.MasterTopics
  235. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  236. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  237. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  238. log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  239. }
  240. }
  241. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  242. // 获取所有需要采集的topic
  243. var faultCodeTopics []string
  244. for _, code := range cfg.CloudConfig.Triggers {
  245. if code.Label == faultLabel {
  246. faultCodeTopics = code.Topics
  247. }
  248. }
  249. return faultCodeTopics, nil
  250. //// 根据不同节点采集的topic进行分配采集
  251. //for _, acceptTopic := range faultCodeTopics {
  252. // for _, host := range cfg.CloudConfig.Hosts {
  253. // for _, topic := range host.Topics {
  254. // if host.Name == cfg.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  255. // masterTopics = append(masterTopics, acceptTopic)
  256. // }
  257. // }
  258. // }
  259. //}
  260. //return masterTopics, slaveTopics
  261. }