produce_window.go 43 KB


  1. package svc
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/package/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "github.com/bluenviron/goroslib/v2"
  11. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. m sync.RWMutex
  21. velocityX float64
  22. velocityY float64
  23. yaw float64
  24. )
  25. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  26. func PrepareTimeWindowProducerQueue() {
  27. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  28. c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfCicvLocation)
  29. var err error
  30. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  31. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  32. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  33. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  34. for i, topic := range commonConfig.SubscribeTopics {
  35. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  36. // 1
  37. if topic == masterConfig.TopicOfAmrPose && len(masterConfig.RuleOfAmrPose) > 0 {
  38. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  39. Node: commonConfig.RosNode,
  40. Topic: topic,
  41. Callback: func(data *visualization_msgs.MarkerArray) {
  42. if len(masterConfig.TopicOfAmrPose) == 0 {
  43. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  44. return
  45. }
  46. subscribersTimeMutexes[i].Lock()
  47. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  48. subscribersMutexes[i].Lock()
  49. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  50. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  51. var faultLabel string
  52. for _, f := range masterConfig.RuleOfAmrPose {
  53. faultLabel = f(data)
  54. if faultLabel != "" {
  55. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  56. subscribersTimes[i] = time.Now()
  57. break
  58. }
  59. }
  60. subscribersMutexes[i].Unlock()
  61. }
  62. subscribersTimeMutexes[i].Unlock()
  63. },
  64. })
  65. }
  66. // 2
  67. if topic == masterConfig.TopicOfBoundingBoxesFast && len(masterConfig.RuleOfBoundingBoxesFast) > 0 {
  68. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  69. Node: commonConfig.RosNode,
  70. Topic: topic,
  71. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  72. if len(masterConfig.TopicOfBoundingBoxesFast) == 0 {
  73. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  74. return
  75. }
  76. subscribersTimeMutexes[i].Lock()
  77. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  78. subscribersMutexes[i].Lock()
  79. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  80. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  81. var faultLabel string
  82. for _, f := range masterConfig.RuleOfBoundingBoxesFast {
  83. faultLabel = f(data)
  84. if faultLabel != "" {
  85. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  86. subscribersTimes[i] = time.Now()
  87. break
  88. }
  89. }
  90. subscribersMutexes[i].Unlock()
  91. }
  92. subscribersTimeMutexes[i].Unlock()
  93. },
  94. })
  95. }
  96. // 3
  97. if topic == masterConfig.TopicOfCameraFault && len(masterConfig.RuleOfCameraFault) > 0 {
  98. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  99. Node: commonConfig.RosNode,
  100. Topic: topic,
  101. Callback: func(data *pjisuv_msgs.FaultVec) {
  102. if len(masterConfig.TopicOfCameraFault) == 0 {
  103. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  104. return
  105. }
  106. subscribersTimeMutexes[i].Lock()
  107. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  108. subscribersMutexes[i].Lock()
  109. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  110. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  111. var faultLabel string
  112. for _, f := range masterConfig.RuleOfCameraFault {
  113. faultLabel = f(data)
  114. if faultLabel != "" {
  115. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  116. subscribersTimes[i] = time.Now()
  117. break
  118. }
  119. }
  120. subscribersMutexes[i].Unlock()
  121. }
  122. subscribersTimeMutexes[i].Unlock()
  123. },
  124. })
  125. }
  126. // 4
  127. if topic == masterConfig.TopicOfCanData && len(masterConfig.RuleOfCanData) > 0 {
  128. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  129. Node: commonConfig.RosNode,
  130. Topic: topic,
  131. Callback: func(data *pjisuv_msgs.Frame) {
  132. if len(masterConfig.TopicOfCanData) == 0 {
  133. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  134. return
  135. }
  136. subscribersTimeMutexes[i].Lock()
  137. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  138. subscribersMutexes[i].Lock()
  139. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  140. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  141. var faultLabel string
  142. for _, f := range masterConfig.RuleOfCanData {
  143. faultLabel = f(data)
  144. if faultLabel != "" {
  145. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  146. subscribersTimes[i] = time.Now()
  147. break
  148. }
  149. }
  150. subscribersMutexes[i].Unlock()
  151. }
  152. subscribersTimeMutexes[i].Unlock()
  153. },
  154. })
  155. }
  156. // 5
  157. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud && len(masterConfig.RuleOfCh128x1LslidarPointCloud) > 0 {
  158. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  159. Node: commonConfig.RosNode,
  160. Topic: topic,
  161. Callback: func(data *sensor_msgs.PointCloud2) {
  162. if len(masterConfig.TopicOfCh128x1LslidarPointCloud) == 0 {
  163. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  164. return
  165. }
  166. subscribersTimeMutexes[i].Lock()
  167. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  168. subscribersMutexes[i].Lock()
  169. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  170. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  171. var faultLabel string
  172. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud {
  173. faultLabel = f(data)
  174. if faultLabel != "" {
  175. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  176. subscribersTimes[i] = time.Now()
  177. break
  178. }
  179. }
  180. subscribersMutexes[i].Unlock()
  181. }
  182. subscribersTimeMutexes[i].Unlock()
  183. },
  184. })
  185. }
  186. // 6
  187. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud && len(masterConfig.RuleOfCh64wLLslidarPointCloud) > 0 {
  188. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  189. Node: commonConfig.RosNode,
  190. Topic: topic,
  191. Callback: func(data *sensor_msgs.PointCloud2) {
  192. if len(masterConfig.TopicOfCh64wLLslidarPointCloud) == 0 {
  193. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  194. return
  195. }
  196. subscribersTimeMutexes[i].Lock()
  197. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  198. subscribersMutexes[i].Lock()
  199. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  200. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  201. var faultLabel string
  202. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud {
  203. faultLabel = f(data)
  204. if faultLabel != "" {
  205. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  206. subscribersTimes[i] = time.Now()
  207. break
  208. }
  209. }
  210. subscribersMutexes[i].Unlock()
  211. }
  212. subscribersTimeMutexes[i].Unlock()
  213. },
  214. })
  215. }
  216. // 7
  217. if topic == masterConfig.TopicOfCh64wLScan && len(masterConfig.RuleOfCh64wLScan) > 0 {
  218. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  219. Node: commonConfig.RosNode,
  220. Topic: topic,
  221. Callback: func(data *sensor_msgs.LaserScan) {
  222. if len(masterConfig.TopicOfCh64wLScan) == 0 {
  223. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  224. return
  225. }
  226. subscribersTimeMutexes[i].Lock()
  227. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  228. subscribersMutexes[i].Lock()
  229. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  230. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  231. var faultLabel string
  232. for _, f := range masterConfig.RuleOfCh64wLScan {
  233. faultLabel = f(data)
  234. if faultLabel != "" {
  235. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  236. subscribersTimes[i] = time.Now()
  237. break
  238. }
  239. }
  240. subscribersMutexes[i].Unlock()
  241. }
  242. subscribersTimeMutexes[i].Unlock()
  243. },
  244. })
  245. }
  246. // 8
  247. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud && len(masterConfig.RuleOfCh64wRLslidarPointCloud) > 0 {
  248. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  249. Node: commonConfig.RosNode,
  250. Topic: topic,
  251. Callback: func(data *sensor_msgs.PointCloud2) {
  252. if len(masterConfig.TopicOfCh64wRLslidarPointCloud) == 0 {
  253. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  254. return
  255. }
  256. subscribersTimeMutexes[i].Lock()
  257. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  258. subscribersMutexes[i].Lock()
  259. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  260. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  261. var faultLabel string
  262. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud {
  263. faultLabel = f(data)
  264. if faultLabel != "" {
  265. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  266. subscribersTimes[i] = time.Now()
  267. break
  268. }
  269. }
  270. subscribersMutexes[i].Unlock()
  271. }
  272. subscribersTimeMutexes[i].Unlock()
  273. },
  274. })
  275. }
  276. // 9
  277. if topic == masterConfig.TopicOfCh64wRScan && len(masterConfig.RuleOfCh64wRScan) > 0 {
  278. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  279. Node: commonConfig.RosNode,
  280. Topic: topic,
  281. Callback: func(data *sensor_msgs.LaserScan) {
  282. if len(masterConfig.TopicOfCh64wRScan) == 0 {
  283. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  284. return
  285. }
  286. subscribersTimeMutexes[i].Lock()
  287. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  288. subscribersMutexes[i].Lock()
  289. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  290. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  291. var faultLabel string
  292. for _, f := range masterConfig.RuleOfCh64wRScan {
  293. faultLabel = f(data)
  294. if faultLabel != "" {
  295. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  296. subscribersTimes[i] = time.Now()
  297. break
  298. }
  299. }
  300. subscribersMutexes[i].Unlock()
  301. }
  302. subscribersTimeMutexes[i].Unlock()
  303. },
  304. })
  305. }
  306. // 10
  307. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects && len(masterConfig.RuleOfCicvLidarclusterMovingObjects) > 0 {
  308. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  309. Node: commonConfig.RosNode,
  310. Topic: topic,
  311. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  312. if len(masterConfig.TopicOfCicvLidarclusterMovingObjects) == 0 {
  313. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  314. return
  315. }
  316. subscribersTimeMutexes[i].Lock()
  317. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  318. subscribersMutexes[i].Lock()
  319. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  320. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  321. var faultLabel string
  322. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects {
  323. faultLabel = f(data)
  324. if faultLabel != "" {
  325. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  326. subscribersTimes[i] = time.Now()
  327. break
  328. }
  329. }
  330. subscribersMutexes[i].Unlock()
  331. }
  332. subscribersTimeMutexes[i].Unlock()
  333. },
  334. })
  335. }
  336. // 11
  337. if topic == masterConfig.TopicOfCicvAmrTrajectory && len(masterConfig.RuleOfCicvAmrTrajectory) > 0 {
  338. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  339. Node: commonConfig.RosNode,
  340. Topic: topic,
  341. Callback: func(data *pjisuv_msgs.Trajectory) {
  342. if len(masterConfig.TopicOfCicvAmrTrajectory) == 0 {
  343. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  344. return
  345. }
  346. subscribersTimeMutexes[i].Lock()
  347. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  348. subscribersMutexes[i].Lock()
  349. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  350. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  351. var faultLabel string
  352. for _, f := range masterConfig.RuleOfCicvAmrTrajectory {
  353. faultLabel = f(data)
  354. if faultLabel != "" {
  355. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  356. subscribersTimes[i] = time.Now()
  357. break
  358. }
  359. }
  360. subscribersMutexes[i].Unlock()
  361. }
  362. subscribersTimeMutexes[i].Unlock()
  363. },
  364. })
  365. }
  366. // 12
  367. if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
  368. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  369. Node: commonConfig.RosNode,
  370. Topic: topic,
  371. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  372. // 更新共享变量
  373. m.RLock()
  374. velocityX = data.VelocityX
  375. velocityY = data.VelocityY
  376. yaw = data.Yaw
  377. m.RUnlock()
  378. if len(masterConfig.TopicOfCicvLocation) == 0 {
  379. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  380. return
  381. }
  382. subscribersTimeMutexes[i].Lock()
  383. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  384. subscribersMutexes[i].Lock()
  385. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  386. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  387. var faultLabel string
  388. for _, f := range masterConfig.RuleOfCicvLocation {
  389. faultLabel = f(data)
  390. if faultLabel != "" {
  391. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  392. break
  393. }
  394. }
  395. subscribersMutexes[i].Unlock()
  396. }
  397. subscribersTimeMutexes[i].Unlock()
  398. },
  399. })
  400. }
  401. // 13
  402. if topic == masterConfig.TopicOfCloudClusters && len(masterConfig.RuleOfCloudClusters) > 0 {
  403. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  404. Node: commonConfig.RosNode,
  405. Topic: topic,
  406. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  407. if len(masterConfig.TopicOfCloudClusters) == 0 {
  408. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  409. return
  410. }
  411. subscribersTimeMutexes[i].Lock()
  412. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  413. subscribersMutexes[i].Lock()
  414. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  415. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  416. var faultLabel string
  417. for _, f := range masterConfig.RuleOfCloudClusters {
  418. faultLabel = f(data)
  419. if faultLabel != "" {
  420. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  421. subscribersTimes[i] = time.Now()
  422. break
  423. }
  424. }
  425. subscribersMutexes[i].Unlock()
  426. }
  427. subscribersTimeMutexes[i].Unlock()
  428. },
  429. })
  430. }
  431. // 14
  432. if topic == masterConfig.TopicOfHeartbeatInfo && len(masterConfig.RuleOfHeartbeatInfo) > 0 {
  433. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  434. Node: commonConfig.RosNode,
  435. Topic: topic,
  436. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  437. if len(masterConfig.TopicOfHeartbeatInfo) == 0 {
  438. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  439. return
  440. }
  441. subscribersTimeMutexes[i].Lock()
  442. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  443. subscribersMutexes[i].Lock()
  444. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  445. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  446. var faultLabel string
  447. for _, f := range masterConfig.RuleOfHeartbeatInfo {
  448. faultLabel = f(data)
  449. if faultLabel != "" {
  450. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  451. subscribersTimes[i] = time.Now()
  452. break
  453. }
  454. }
  455. subscribersMutexes[i].Unlock()
  456. }
  457. subscribersTimeMutexes[i].Unlock()
  458. },
  459. })
  460. }
  461. // 15
  462. if topic == masterConfig.TopicOfLidarPretreatmentCost && len(masterConfig.RuleOfLidarPretreatmentCost) > 0 {
  463. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  464. Node: commonConfig.RosNode,
  465. Topic: topic,
  466. Callback: func(data *geometry_msgs.Vector3Stamped) {
  467. if len(masterConfig.TopicOfLidarPretreatmentCost) == 0 {
  468. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  469. return
  470. }
  471. subscribersTimeMutexes[i].Lock()
  472. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  473. subscribersMutexes[i].Lock()
  474. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  475. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  476. var faultLabel string
  477. for _, f := range masterConfig.RuleOfLidarPretreatmentCost {
  478. faultLabel = f(data)
  479. if faultLabel != "" {
  480. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  481. subscribersTimes[i] = time.Now()
  482. break
  483. }
  484. }
  485. subscribersMutexes[i].Unlock()
  486. }
  487. subscribersTimeMutexes[i].Unlock()
  488. },
  489. })
  490. }
  491. // 16
  492. if topic == masterConfig.TopicOfLidarPretreatmentOdometry && len(masterConfig.RuleOfLidarPretreatmentOdometry) > 0 {
  493. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  494. Node: commonConfig.RosNode,
  495. Topic: topic,
  496. Callback: func(data *nav_msgs.Odometry) {
  497. if len(masterConfig.TopicOfLidarPretreatmentOdometry) == 0 {
  498. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  499. return
  500. }
  501. subscribersTimeMutexes[i].Lock()
  502. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  503. subscribersMutexes[i].Lock()
  504. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  505. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  506. var faultLabel string
  507. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry {
  508. faultLabel = f(data)
  509. if faultLabel != "" {
  510. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  511. subscribersTimes[i] = time.Now()
  512. break
  513. }
  514. }
  515. subscribersMutexes[i].Unlock()
  516. }
  517. subscribersTimeMutexes[i].Unlock()
  518. },
  519. })
  520. }
  521. // 17
  522. if topic == masterConfig.TopicOfLidarRoi && len(masterConfig.RuleOfLidarRoi) > 0 {
  523. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  524. Node: commonConfig.RosNode,
  525. Topic: topic,
  526. Callback: func(data *geometry_msgs.PolygonStamped) {
  527. if len(masterConfig.TopicOfLidarRoi) == 0 {
  528. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  529. return
  530. }
  531. subscribersTimeMutexes[i].Lock()
  532. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  533. subscribersMutexes[i].Lock()
  534. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  535. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  536. var faultLabel string
  537. for _, f := range masterConfig.RuleOfLidarRoi {
  538. faultLabel = f(data)
  539. if faultLabel != "" {
  540. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  541. subscribersTimes[i] = time.Now()
  542. break
  543. }
  544. }
  545. subscribersMutexes[i].Unlock()
  546. }
  547. subscribersTimeMutexes[i].Unlock()
  548. },
  549. })
  550. }
  551. // 18
  552. if topic == masterConfig.TopicOfLine1 && len(masterConfig.RuleOfLine1) > 0 {
  553. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  554. Node: commonConfig.RosNode,
  555. Topic: topic,
  556. Callback: func(data *nav_msgs.Path) {
  557. if len(masterConfig.TopicOfLine1) == 0 {
  558. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  559. return
  560. }
  561. subscribersTimeMutexes[i].Lock()
  562. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  563. subscribersMutexes[i].Lock()
  564. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  565. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  566. var faultLabel string
  567. for _, f := range masterConfig.RuleOfLine1 {
  568. faultLabel = f(data)
  569. if faultLabel != "" {
  570. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  571. subscribersTimes[i] = time.Now()
  572. break
  573. }
  574. }
  575. subscribersMutexes[i].Unlock()
  576. }
  577. subscribersTimeMutexes[i].Unlock()
  578. },
  579. })
  580. }
  581. // 19
  582. if topic == masterConfig.TopicOfLine2 && len(masterConfig.RuleOfLine2) > 0 {
  583. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  584. Node: commonConfig.RosNode,
  585. Topic: topic,
  586. Callback: func(data *nav_msgs.Path) {
  587. if len(masterConfig.TopicOfLine2) == 0 {
  588. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  589. return
  590. }
  591. subscribersTimeMutexes[i].Lock()
  592. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  593. subscribersMutexes[i].Lock()
  594. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  595. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  596. var faultLabel string
  597. for _, f := range masterConfig.RuleOfLine2 {
  598. faultLabel = f(data)
  599. if faultLabel != "" {
  600. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  601. subscribersTimes[i] = time.Now()
  602. break
  603. }
  604. }
  605. subscribersMutexes[i].Unlock()
  606. }
  607. subscribersTimeMutexes[i].Unlock()
  608. },
  609. })
  610. }
  611. // 20
  612. if topic == masterConfig.TopicOfMapPolygon && len(masterConfig.RuleOfMapPolygon) > 0 {
  613. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  614. Node: commonConfig.RosNode,
  615. Topic: topic,
  616. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  617. if len(masterConfig.TopicOfMapPolygon) == 0 {
  618. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  619. return
  620. }
  621. subscribersTimeMutexes[i].Lock()
  622. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  623. subscribersMutexes[i].Lock()
  624. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  625. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  626. var faultLabel string
  627. for _, f := range masterConfig.RuleOfMapPolygon {
  628. faultLabel = f(data)
  629. if faultLabel != "" {
  630. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  631. subscribersTimes[i] = time.Now()
  632. break
  633. }
  634. }
  635. subscribersMutexes[i].Unlock()
  636. }
  637. subscribersTimeMutexes[i].Unlock()
  638. },
  639. })
  640. }
  641. // 21
  642. if topic == masterConfig.TopicOfObstacleDisplay && len(masterConfig.RuleOfObstacleDisplay) > 0 {
  643. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  644. Node: commonConfig.RosNode,
  645. Topic: topic,
  646. Callback: func(data *visualization_msgs.MarkerArray) {
  647. if len(masterConfig.TopicOfObstacleDisplay) == 0 {
  648. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  649. return
  650. }
  651. subscribersTimeMutexes[i].Lock()
  652. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  653. subscribersMutexes[i].Lock()
  654. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  655. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  656. var faultLabel string
  657. for _, f := range masterConfig.RuleOfObstacleDisplay {
  658. faultLabel = f(data)
  659. if faultLabel != "" {
  660. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  661. subscribersTimes[i] = time.Now()
  662. break
  663. }
  664. }
  665. subscribersMutexes[i].Unlock()
  666. }
  667. subscribersTimeMutexes[i].Unlock()
  668. },
  669. })
  670. }
  671. // 22
  672. if topic == masterConfig.TopicOfPjControlPub && len(masterConfig.RuleOfPjControlPub) > 0 {
  673. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  674. Node: commonConfig.RosNode,
  675. Topic: topic,
  676. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  677. if len(masterConfig.TopicOfPjControlPub) == 0 {
  678. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  679. return
  680. }
  681. subscribersTimeMutexes[i].Lock()
  682. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  683. subscribersMutexes[i].Lock()
  684. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  685. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  686. var faultLabel string
  687. for _, f := range masterConfig.RuleOfPjControlPub {
  688. faultLabel = f(data)
  689. if faultLabel != "" {
  690. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  691. subscribersTimes[i] = time.Now()
  692. break
  693. }
  694. }
  695. subscribersMutexes[i].Unlock()
  696. }
  697. subscribersTimeMutexes[i].Unlock()
  698. },
  699. })
  700. }
  701. // 23
  702. if topic == masterConfig.TopicOfPointsCluster && len(masterConfig.RuleOfPointsCluster) > 0 {
  703. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  704. Node: commonConfig.RosNode,
  705. Topic: topic,
  706. Callback: func(data *sensor_msgs.PointCloud2) {
  707. if len(masterConfig.TopicOfPointsCluster) == 0 {
  708. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  709. return
  710. }
  711. subscribersTimeMutexes[i].Lock()
  712. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  713. subscribersMutexes[i].Lock()
  714. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  715. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  716. var faultLabel string
  717. for _, f := range masterConfig.RuleOfPointsCluster {
  718. faultLabel = f(data)
  719. if faultLabel != "" {
  720. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  721. subscribersTimes[i] = time.Now()
  722. break
  723. }
  724. }
  725. subscribersMutexes[i].Unlock()
  726. }
  727. subscribersTimeMutexes[i].Unlock()
  728. },
  729. })
  730. }
  731. // 24
  732. if topic == masterConfig.TopicOfPointsConcat && len(masterConfig.RuleOfPointsConcat) > 0 {
  733. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  734. Node: commonConfig.RosNode,
  735. Topic: topic,
  736. Callback: func(data *sensor_msgs.PointCloud2) {
  737. if len(masterConfig.TopicOfPointsConcat) == 0 {
  738. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  739. return
  740. }
  741. subscribersTimeMutexes[i].Lock()
  742. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  743. subscribersMutexes[i].Lock()
  744. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  745. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  746. var faultLabel string
  747. for _, f := range masterConfig.RuleOfPointsConcat {
  748. faultLabel = f(data)
  749. if faultLabel != "" {
  750. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  751. subscribersTimes[i] = time.Now()
  752. break
  753. }
  754. }
  755. subscribersMutexes[i].Unlock()
  756. }
  757. subscribersTimeMutexes[i].Unlock()
  758. },
  759. })
  760. }
  761. // 25
  762. if topic == masterConfig.TopicOfReferenceDisplay && len(masterConfig.RuleOfReferenceDisplay) > 0 {
  763. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  764. Node: commonConfig.RosNode,
  765. Topic: topic,
  766. Callback: func(data *nav_msgs.Path) {
  767. if len(masterConfig.TopicOfReferenceDisplay) == 0 {
  768. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  769. return
  770. }
  771. subscribersTimeMutexes[i].Lock()
  772. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  773. subscribersMutexes[i].Lock()
  774. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  775. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  776. var faultLabel string
  777. for _, f := range masterConfig.RuleOfReferenceDisplay {
  778. faultLabel = f(data)
  779. if faultLabel != "" {
  780. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  781. subscribersTimes[i] = time.Now()
  782. break
  783. }
  784. }
  785. subscribersMutexes[i].Unlock()
  786. }
  787. subscribersTimeMutexes[i].Unlock()
  788. },
  789. })
  790. }
  791. // 26
  792. if topic == masterConfig.TopicOfReferenceTrajectory && len(masterConfig.RuleOfReferenceTrajectory) > 0 {
  793. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  794. Node: commonConfig.RosNode,
  795. Topic: topic,
  796. Callback: func(data *pjisuv_msgs.Trajectory) {
  797. if len(masterConfig.TopicOfReferenceTrajectory) == 0 {
  798. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  799. return
  800. }
  801. subscribersTimeMutexes[i].Lock()
  802. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  803. subscribersMutexes[i].Lock()
  804. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  805. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  806. var faultLabel string
  807. for _, f := range masterConfig.RuleOfReferenceTrajectory {
  808. faultLabel = f(data)
  809. if faultLabel != "" {
  810. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  811. subscribersTimes[i] = time.Now()
  812. break
  813. }
  814. }
  815. subscribersMutexes[i].Unlock()
  816. }
  817. subscribersTimeMutexes[i].Unlock()
  818. },
  819. })
  820. }
  821. // 27
  822. if topic == masterConfig.TopicOfRoiPoints && len(masterConfig.RuleOfRoiPoints) > 0 {
  823. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  824. Node: commonConfig.RosNode,
  825. Topic: topic,
  826. Callback: func(data *sensor_msgs.PointCloud2) {
  827. if len(masterConfig.TopicOfRoiPoints) == 0 {
  828. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  829. return
  830. }
  831. subscribersTimeMutexes[i].Lock()
  832. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  833. subscribersMutexes[i].Lock()
  834. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  835. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  836. var faultLabel string
  837. for _, f := range masterConfig.RuleOfRoiPoints {
  838. faultLabel = f(data)
  839. if faultLabel != "" {
  840. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  841. subscribersTimes[i] = time.Now()
  842. break
  843. }
  844. }
  845. subscribersMutexes[i].Unlock()
  846. }
  847. subscribersTimeMutexes[i].Unlock()
  848. },
  849. })
  850. }
  851. // 28
  852. if topic == masterConfig.TopicOfRoiPolygon && len(masterConfig.RuleOfRoiPolygon) > 0 {
  853. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  854. Node: commonConfig.RosNode,
  855. Topic: topic,
  856. Callback: func(data *nav_msgs.Path) {
  857. if len(masterConfig.TopicOfRoiPolygon) == 0 {
  858. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  859. return
  860. }
  861. subscribersTimeMutexes[i].Lock()
  862. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  863. subscribersMutexes[i].Lock()
  864. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  865. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  866. var faultLabel string
  867. for _, f := range masterConfig.RuleOfRoiPolygon {
  868. faultLabel = f(data)
  869. if faultLabel != "" {
  870. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  871. subscribersTimes[i] = time.Now()
  872. break
  873. }
  874. }
  875. subscribersMutexes[i].Unlock()
  876. }
  877. subscribersTimeMutexes[i].Unlock()
  878. },
  879. })
  880. }
  881. // 29
  882. if topic == masterConfig.TopicOfTf && len(masterConfig.RuleOfTf) > 0 {
  883. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  884. Node: commonConfig.RosNode,
  885. Topic: topic,
  886. Callback: func(data *tf2_msgs.TFMessage) {
  887. if len(masterConfig.TopicOfTf) == 0 {
  888. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  889. return
  890. }
  891. subscribersTimeMutexes[i].Lock()
  892. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  893. subscribersMutexes[i].Lock()
  894. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  895. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  896. var faultLabel string
  897. for _, f := range masterConfig.RuleOfTf {
  898. faultLabel = f(data)
  899. if faultLabel != "" {
  900. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  901. subscribersTimes[i] = time.Now()
  902. break
  903. }
  904. }
  905. subscribersMutexes[i].Unlock()
  906. }
  907. subscribersTimeMutexes[i].Unlock()
  908. },
  909. })
  910. }
  911. // 30
  912. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  913. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  914. Node: commonConfig.RosNode,
  915. Topic: topic,
  916. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  917. if len(masterConfig.TopicOfTpperception) == 0 {
  918. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  919. return
  920. }
  921. subscribersTimeMutexes[i].Lock()
  922. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  923. subscribersMutexes[i].Lock()
  924. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  925. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  926. var faultLabel string
  927. for _, f := range masterConfig.RuleOfTpperception {
  928. faultLabel = f(data, velocityX, velocityY, yaw)
  929. if faultLabel != "" {
  930. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  931. break
  932. }
  933. }
  934. subscribersMutexes[i].Unlock()
  935. }
  936. subscribersTimeMutexes[i].Unlock()
  937. },
  938. })
  939. }
  940. // 31
  941. if topic == masterConfig.TopicOfTpperceptionVis && len(masterConfig.RuleOfTpperceptionVis) > 0 {
  942. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  943. Node: commonConfig.RosNode,
  944. Topic: topic,
  945. Callback: func(data *visualization_msgs.MarkerArray) {
  946. if len(masterConfig.TopicOfTpperceptionVis) == 0 {
  947. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  948. return
  949. }
  950. subscribersTimeMutexes[i].Lock()
  951. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  952. subscribersMutexes[i].Lock()
  953. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  954. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  955. var faultLabel string
  956. for _, f := range masterConfig.RuleOfTpperceptionVis {
  957. faultLabel = f(data)
  958. if faultLabel != "" {
  959. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  960. subscribersTimes[i] = time.Now()
  961. break
  962. }
  963. }
  964. subscribersMutexes[i].Unlock()
  965. }
  966. subscribersTimeMutexes[i].Unlock()
  967. },
  968. })
  969. }
  970. // 32
  971. if topic == masterConfig.TopicOfTprouteplan && len(masterConfig.RuleOfTprouteplan) > 0 {
  972. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  973. Node: commonConfig.RosNode,
  974. Topic: topic,
  975. Callback: func(data *pjisuv_msgs.RoutePlan) {
  976. if len(masterConfig.TopicOfTprouteplan) == 0 {
  977. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  978. return
  979. }
  980. subscribersTimeMutexes[i].Lock()
  981. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  982. subscribersMutexes[i].Lock()
  983. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  984. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  985. var faultLabel string
  986. for _, f := range masterConfig.RuleOfTprouteplan {
  987. faultLabel = f(data)
  988. if faultLabel != "" {
  989. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  990. subscribersTimes[i] = time.Now()
  991. break
  992. }
  993. }
  994. subscribersMutexes[i].Unlock()
  995. }
  996. subscribersTimeMutexes[i].Unlock()
  997. },
  998. })
  999. }
  1000. // 33
  1001. if topic == masterConfig.TopicOfTrajectoryDisplay && len(masterConfig.RuleOfTrajectoryDisplay) > 0 {
  1002. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1003. Node: commonConfig.RosNode,
  1004. Topic: topic,
  1005. Callback: func(data *nav_msgs.Path) {
  1006. if len(masterConfig.TopicOfTrajectoryDisplay) == 0 {
  1007. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  1008. return
  1009. }
  1010. subscribersTimeMutexes[i].Lock()
  1011. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1012. subscribersMutexes[i].Lock()
  1013. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1014. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  1015. var faultLabel string
  1016. for _, f := range masterConfig.RuleOfTrajectoryDisplay {
  1017. faultLabel = f(data)
  1018. if faultLabel != "" {
  1019. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1020. subscribersTimes[i] = time.Now()
  1021. break
  1022. }
  1023. }
  1024. subscribersMutexes[i].Unlock()
  1025. }
  1026. subscribersTimeMutexes[i].Unlock()
  1027. },
  1028. })
  1029. }
  1030. // 34
  1031. if topic == masterConfig.TopicOfUngroundCloudpoints && len(masterConfig.RuleOfUngroundCloudpoints) > 0 {
  1032. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1033. Node: commonConfig.RosNode,
  1034. Topic: topic,
  1035. Callback: func(data *sensor_msgs.PointCloud2) {
  1036. if len(masterConfig.TopicOfUngroundCloudpoints) == 0 {
  1037. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  1038. return
  1039. }
  1040. subscribersTimeMutexes[i].Lock()
  1041. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1042. subscribersMutexes[i].Lock()
  1043. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1044. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  1045. var faultLabel string
  1046. for _, f := range masterConfig.RuleOfUngroundCloudpoints {
  1047. faultLabel = f(data)
  1048. if faultLabel != "" {
  1049. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1050. subscribersTimes[i] = time.Now()
  1051. break
  1052. }
  1053. }
  1054. subscribersMutexes[i].Unlock()
  1055. }
  1056. subscribersTimeMutexes[i].Unlock()
  1057. },
  1058. })
  1059. }
  1060. // 35
  1061. if topic == masterConfig.TopicOfDataRead && len(masterConfig.RuleOfDataRead) > 0 {
  1062. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1063. Node: commonConfig.RosNode,
  1064. Topic: topic,
  1065. Callback: func(data *pjisuv_msgs.Retrieval) {
  1066. if len(masterConfig.TopicOfDataRead) == 0 {
  1067. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  1068. return
  1069. }
  1070. subscribersTimeMutexes[i].Lock()
  1071. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1072. subscribersMutexes[i].Lock()
  1073. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1074. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  1075. var faultLabel string
  1076. for _, f := range masterConfig.RuleOfDataRead {
  1077. faultLabel = f(data)
  1078. if faultLabel != "" {
  1079. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1080. subscribersTimes[i] = time.Now()
  1081. break
  1082. }
  1083. }
  1084. subscribersMutexes[i].Unlock()
  1085. }
  1086. subscribersTimeMutexes[i].Unlock()
  1087. },
  1088. })
  1089. }
  1090. if err != nil {
  1091. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  1092. //TODO 如何回传日志
  1093. continue
  1094. }
  1095. }
  1096. select {
  1097. case signal := <-service.ChannelKillWindowProducer:
  1098. if signal == 1 {
  1099. commonConfig.RosNode.Close()
  1100. service.AddKillTimes("3")
  1101. return
  1102. }
  1103. }
  1104. }
  1105. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *entity.TimeWindow) {
  1106. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1107. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  1108. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  1109. newTimeWindow := entity.TimeWindow{
  1110. FaultTime: faultHappenTime,
  1111. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  1112. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  1113. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  1114. Labels: []string{faultLabel},
  1115. MasterTopics: masterTopics,
  1116. SlaveTopics: slaveTopics,
  1117. }
  1118. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1119. entity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1120. } else {
  1121. // 2-2 如果在旧故障窗口内
  1122. entity.TimeWindowProducerQueueMutex.RLock()
  1123. defer entity.TimeWindowProducerQueueMutex.RUnlock()
  1124. // 2-2-1 更新故障窗口end时间
  1125. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  1126. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1127. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  1128. lastTimeWindow.TimeWindowEnd = maxEnd
  1129. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  1130. } else {
  1131. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  1132. lastTimeWindow.TimeWindowEnd = expectEnd
  1133. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1134. }
  1135. }
  1136. // 2-2-2 更新label
  1137. labels := lastTimeWindow.Labels
  1138. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1139. // 2-2-3 更新 topic
  1140. sourceMasterTopics := lastTimeWindow.MasterTopics
  1141. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1142. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1143. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1144. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1145. }
  1146. }
  1147. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1148. // 获取所有需要采集的topic
  1149. var faultCodeTopics []string
  1150. for _, code := range commonConfig.CloudConfig.Triggers {
  1151. if code.Label == faultLabel {
  1152. faultCodeTopics = code.Topics
  1153. }
  1154. }
  1155. // 根据不同节点采集的topic进行分配采集
  1156. for _, acceptTopic := range faultCodeTopics {
  1157. for _, host := range commonConfig.CloudConfig.Hosts {
  1158. for _, topic := range host.Topics {
  1159. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1160. masterTopics = append(masterTopics, acceptTopic)
  1161. }
  1162. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1163. slaveTopics = append(slaveTopics, acceptTopic)
  1164. }
  1165. }
  1166. }
  1167. }
  1168. return masterTopics, slaveTopics
  1169. }