produce_window.go 10 KB


  1. package svc
  2. import (
  3. "cicv-data-closedloop/common/config/c_log"
  4. commonUtil "cicv-data-closedloop/common/util"
  5. "cicv-data-closedloop/pji/common/cfg"
  6. commonEntity "cicv-data-closedloop/pji/common/ent"
  7. "cicv-data-closedloop/pji/common/global"
  8. commonService "cicv-data-closedloop/pji/common/svc"
  9. "cicv-data-closedloop/pji/common/util"
  10. masterConfig "cicv-data-closedloop/pji/master/package/cfg"
  11. "cicv-data-closedloop/pji_msgs"
  12. "github.com/bluenviron/goroslib/v2"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/diagnostic_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
  16. "os"
  17. "sync"
  18. )
  19. var (
  20. subscriber0Mutex sync.Mutex
  21. subscriber1Mutex sync.Mutex
  22. subscriber2Mutex sync.Mutex
  23. subscriber3Mutex sync.Mutex
  24. subscriber4Mutex sync.Mutex
  25. subscriber5Mutex sync.Mutex
  26. )
  27. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  28. func PrepareTimeWindowProducerQueue() {
  29. //log.GlobalLogger.Info("创建订阅者订阅主题", masterConfig.TopicOfOdom)
  30. //subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  31. // Node: cfg.RosNode,
  32. // Topic: masterConfig.TopicOfOdom,
  33. // Callback: func(data *nav_msgs.Odometry) {
  34. // //log.GlobalLogger.Info("话题", masterConfig.TopicOfOdom, "接收到数据", data)
  35. // subscriber0Mutex.Lock()
  36. // {
  37. // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  38. // lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  39. // var faultLabel string
  40. // for _, f := range masterConfig.RuleOfOdom {
  41. // faultLabel = f(data)
  42. // if faultLabel != "" {
  43. // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  44. // break
  45. // }
  46. // }
  47. // }
  48. // subscriber0Mutex.Unlock()
  49. // },
  50. //})
  51. //if err != nil {
  52. // log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfOdom, "发生故障:", err)
  53. // os.Exit(-1)
  54. //}
  55. c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfObstacleDetection)
  56. subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  57. Node: cfg.RosNode,
  58. Topic: masterConfig.TopicOfObstacleDetection,
  59. Callback: func(data *std_msgs.UInt8) {
  60. //c_log.GlobalLogger.Info("话题", masterConfig.TopicOfObstacleDetection, "接收到数据", data)
  61. subscriber1Mutex.Lock()
  62. {
  63. faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
  64. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  65. var faultLabel string
  66. for _, f := range masterConfig.RuleOfObstacleDetection {
  67. faultLabel = f(data)
  68. if faultLabel != "" {
  69. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  70. break
  71. }
  72. }
  73. }
  74. subscriber1Mutex.Unlock()
  75. },
  76. })
  77. if err != nil {
  78. c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  79. os.Exit(-1)
  80. }
  81. c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfSysInfo)
  82. subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  83. Node: cfg.RosNode,
  84. Topic: masterConfig.TopicOfSysInfo,
  85. Callback: func(data *pji_msgs.SysInfo) {
  86. //c_log.GlobalLogger.Info("话题", masterConfig.TopicOfSysInfo, "接收到数据", data)
  87. subscriber2Mutex.Lock()
  88. {
  89. faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
  90. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  91. var faultLabel string
  92. for _, f := range masterConfig.RuleOfSysInfo {
  93. faultLabel = f(data)
  94. if faultLabel != "" {
  95. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  96. break
  97. }
  98. }
  99. }
  100. subscriber2Mutex.Unlock()
  101. },
  102. })
  103. if err != nil {
  104. c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  105. os.Exit(-1)
  106. }
  107. c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfLocateInfo)
  108. subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  109. Node: cfg.RosNode,
  110. Topic: masterConfig.TopicOfLocateInfo,
  111. Callback: func(data *pji_msgs.LocateInfo) {
  112. //c_log.GlobalLogger.Info("话题", masterConfig.TopicOfLocateInfo, "接收到数据", data)
  113. subscriber3Mutex.Lock()
  114. {
  115. faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
  116. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  117. var faultLabel string
  118. for _, f := range masterConfig.RuleOfLocateInfo {
  119. faultLabel = f(data)
  120. if faultLabel != "" {
  121. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  122. break
  123. }
  124. }
  125. }
  126. subscriber3Mutex.Unlock()
  127. },
  128. })
  129. if err != nil {
  130. c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  131. os.Exit(-1)
  132. }
  133. c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfImu)
  134. subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  135. Node: cfg.RosNode,
  136. Topic: masterConfig.TopicOfImu,
  137. Callback: func(data *sensor_msgs.Imu) {
  138. //c_log.GlobalLogger.Info("话题", masterConfig.TopicOfImu, "接收到数据", data)
  139. subscriber4Mutex.Lock()
  140. {
  141. faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
  142. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  143. var faultLabel string
  144. for _, f := range masterConfig.RuleOfImu {
  145. faultLabel = f(data)
  146. if faultLabel != "" {
  147. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  148. break
  149. }
  150. }
  151. }
  152. subscriber4Mutex.Unlock()
  153. },
  154. })
  155. if err != nil {
  156. c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  157. os.Exit(-1)
  158. }
  159. c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfDiagnostics)
  160. subscriber5, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  161. Node: cfg.RosNode,
  162. Topic: masterConfig.TopicOfDiagnostics,
  163. Callback: func(data *diagnostic_msgs.DiagnosticArray) {
  164. //c_log.GlobalLogger.Info("话题", masterConfig.TopicOfDiagnostics, "接收到数据", data)
  165. subscriber5Mutex.Lock()
  166. {
  167. faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
  168. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  169. var faultLabel string
  170. for _, f := range masterConfig.RuleOfDiagnostics {
  171. faultLabel = f(data)
  172. if faultLabel != "" {
  173. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  174. break
  175. }
  176. }
  177. }
  178. subscriber5Mutex.Unlock()
  179. },
  180. })
  181. if err != nil {
  182. c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  183. os.Exit(-1)
  184. }
  185. select {
  186. case signal := <-commonService.ChannelKillSubscriber:
  187. if signal == 1 {
  188. //subscriber0.Close()
  189. subscriber1.Close()
  190. subscriber2.Close()
  191. subscriber3.Close()
  192. subscriber4.Close()
  193. subscriber5.Close()
  194. cfg.RosNode.Close()
  195. commonService.AddKillTimes("3")
  196. return
  197. }
  198. }
  199. }
  200. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  201. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  202. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  203. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  204. newTimeWindow := commonEntity.TimeWindow{
  205. FaultTime: faultHappenTime,
  206. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -cfg.PlatformConfig.TaskBeforeTime),
  207. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, cfg.PlatformConfig.TaskAfterTime),
  208. Length: cfg.PlatformConfig.TaskBeforeTime + cfg.PlatformConfig.TaskAfterTime + 1,
  209. Labels: []string{faultLabel},
  210. MasterTopics: masterTopics,
  211. SlaveTopics: slaveTopics,
  212. }
  213. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  214. util.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  215. } else {
  216. // 2-2 如果在旧故障窗口内
  217. global.TimeWindowProducerQueueMutex.RLock()
  218. defer global.TimeWindowProducerQueueMutex.RUnlock()
  219. // 2-2-1 更新故障窗口end时间
  220. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, cfg.PlatformConfig.TaskMaxTime)
  221. expectEnd := util.TimeCustomChange(faultHappenTime, cfg.PlatformConfig.TaskAfterTime)
  222. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  223. lastTimeWindow.TimeWindowEnd = maxEnd
  224. lastTimeWindow.Length = cfg.PlatformConfig.TaskMaxTime
  225. } else {
  226. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  227. lastTimeWindow.TimeWindowEnd = expectEnd
  228. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  229. }
  230. }
  231. // 2-2-2 更新label
  232. labels := lastTimeWindow.Labels
  233. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  234. // 2-2-3 更新 topic
  235. sourceMasterTopics := lastTimeWindow.MasterTopics
  236. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  237. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  238. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  239. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  240. }
  241. }
  242. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  243. // 获取所有需要采集的topic
  244. var faultCodeTopics []string
  245. for _, code := range cfg.CloudConfig.Triggers {
  246. if code.Label == faultLabel {
  247. faultCodeTopics = code.Topics
  248. }
  249. }
  250. return faultCodeTopics, nil
  251. //// 根据不同节点采集的topic进行分配采集
  252. //for _, acceptTopic := range faultCodeTopics {
  253. // for _, host := range cfg.CloudConfig.Hosts {
  254. // for _, topic := range host.Topics {
  255. // if host.Name == cfg.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  256. // masterTopics = append(masterTopics, acceptTopic)
  257. // }
  258. // }
  259. // }
  260. //}
  261. //return masterTopics, slaveTopics
  262. }