produce_window.go 68 KB


  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "cicv-data-closedloop/pjisuv_ticker"
  11. "github.com/bluenviron/goroslib/v2"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  16. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  17. "math"
  18. "sync"
  19. "time"
  20. )
  21. var (
  22. // -----------------------------共享变量
  23. // /tpperception
  24. objDicOfTpperception = make(map[uint32][]float32)
  25. objTypeDicOfTpperception = make(map[uint32]uint8)
  26. objSpeedDicOfTpperception = make(map[uint32]float64)
  27. // /pji_control_pub
  28. numCountPjiControlCommandOfPjControlPub int
  29. egoSteeringCmdOfPjControlPub []float64
  30. egoThrottleCmdOfPjControlPub []float64
  31. // /data_read
  32. numCountDataReadOfDataRead int
  33. egoSteeringRealOfDataRead []float64
  34. egoThrottleRealOfDataRead []float64
  35. // --------------------------------------------------
  36. shareVars = new(sync.Map)
  37. saveTimeWindowMutex sync.Mutex // 保存时间窗口需要锁,防止数据竟态
  38. latestTimeWindowEnd = util.GetTimeCustom(time.Now())
  39. triggerInterval = 3.0 // 每个触发器3秒触发一次
  40. )
  41. // 负责监听所有主题并修改时间窗口
  42. func ProduceWindow() {
  43. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  44. var err error
  45. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  46. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  47. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  48. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  49. for i, topic := range commonConfig.SubscribeTopics {
  50. for {
  51. // 定时器,区别于订阅者
  52. if topic == pjisuv_ticker.TickerTopic {
  53. // 1 把所有触发器函数执行一遍,触发器内部创建额外的定时任务goroutine
  54. for _, f := range masterConfig.RuleOfCicvTicker {
  55. f(shareVars)
  56. }
  57. // 2 创建goroutine接收定时任务触发器返回的Label和Time,执行触发逻辑
  58. go func() {
  59. for {
  60. time.Sleep(time.Duration(1)) // 降低循环频率
  61. select {
  62. case tickInfo := <-pjisuv_ticker.TickerChan:
  63. faultLabel := tickInfo.FaultLabel
  64. faultHappenTime := tickInfo.FaultHappenTime
  65. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  66. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  67. default:
  68. }
  69. }
  70. }()
  71. }
  72. // 其他常规监听器
  73. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  74. // 1
  75. if topic == masterConfig.TopicOfAmrPose && (len(masterConfig.RuleOfAmrPose1) > 0 || len(masterConfig.RuleOfAmrPose3) > 0) {
  76. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  77. Node: commonConfig.RosNode,
  78. Topic: topic,
  79. Callback: func(data *visualization_msgs.MarkerArray) {
  80. subscribersTimeMutexes[i].Lock()
  81. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  82. subscribersMutexes[i].Lock()
  83. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  84. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  85. faultLabel := ""
  86. if len(masterConfig.RuleOfAmrPose1) > 0 {
  87. for _, f := range masterConfig.RuleOfAmrPose1 {
  88. faultLabel = f(data)
  89. if faultLabel != "" {
  90. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  91. subscribersTimes[i] = time.Now()
  92. goto TriggerSuccess
  93. }
  94. }
  95. }
  96. if len(masterConfig.RuleOfAmrPose3) > 0 {
  97. for _, f := range masterConfig.RuleOfAmrPose3 {
  98. faultLabel = f(shareVars, data)
  99. if faultLabel != "" {
  100. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  101. subscribersTimes[i] = time.Now()
  102. goto TriggerSuccess
  103. }
  104. }
  105. }
  106. TriggerSuccess:
  107. subscribersMutexes[i].Unlock()
  108. }
  109. subscribersTimeMutexes[i].Unlock()
  110. },
  111. })
  112. }
  113. // 2
  114. if topic == masterConfig.TopicOfBoundingBoxesFast &&
  115. (len(masterConfig.RuleOfBoundingBoxesFast1) > 0 ||
  116. len(masterConfig.RuleOfBoundingBoxesFast3) > 0) {
  117. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  118. Node: commonConfig.RosNode,
  119. Topic: topic,
  120. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  121. subscribersTimeMutexes[i].Lock()
  122. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  123. subscribersMutexes[i].Lock()
  124. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  125. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  126. faultLabel := ""
  127. if len(masterConfig.RuleOfBoundingBoxesFast1) > 0 {
  128. for _, f := range masterConfig.RuleOfBoundingBoxesFast1 {
  129. faultLabel = f(data)
  130. if faultLabel != "" {
  131. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  132. subscribersTimes[i] = time.Now()
  133. goto TriggerSuccess
  134. }
  135. }
  136. }
  137. if len(masterConfig.RuleOfBoundingBoxesFast3) > 0 {
  138. for _, f := range masterConfig.RuleOfBoundingBoxesFast3 {
  139. faultLabel = f(shareVars, data)
  140. if faultLabel != "" {
  141. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  142. subscribersTimes[i] = time.Now()
  143. goto TriggerSuccess
  144. }
  145. }
  146. }
  147. TriggerSuccess:
  148. subscribersMutexes[i].Unlock()
  149. }
  150. subscribersTimeMutexes[i].Unlock()
  151. },
  152. })
  153. }
  154. // 3
  155. if topic == masterConfig.TopicOfCameraFault &&
  156. (len(masterConfig.RuleOfCameraFault1) > 0 ||
  157. len(masterConfig.RuleOfCameraFault3) > 0) {
  158. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  159. Node: commonConfig.RosNode,
  160. Topic: topic,
  161. Callback: func(data *pjisuv_msgs.FaultVec) {
  162. subscribersTimeMutexes[i].Lock()
  163. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  164. subscribersMutexes[i].Lock()
  165. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  166. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  167. faultLabel := ""
  168. if len(masterConfig.RuleOfCameraFault1) > 0 {
  169. for _, f := range masterConfig.RuleOfCameraFault1 {
  170. faultLabel = f(data)
  171. if faultLabel != "" {
  172. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  173. subscribersTimes[i] = time.Now()
  174. goto TriggerSuccess
  175. }
  176. }
  177. }
  178. if len(masterConfig.RuleOfCameraFault3) > 0 {
  179. for _, f := range masterConfig.RuleOfCameraFault3 {
  180. faultLabel = f(shareVars, data)
  181. if faultLabel != "" {
  182. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  183. subscribersTimes[i] = time.Now()
  184. goto TriggerSuccess
  185. }
  186. }
  187. }
  188. TriggerSuccess:
  189. subscribersMutexes[i].Unlock()
  190. }
  191. subscribersTimeMutexes[i].Unlock()
  192. },
  193. })
  194. }
  195. // 4
  196. if topic == masterConfig.TopicOfCanData &&
  197. (len(masterConfig.RuleOfCanData1) > 0 ||
  198. len(masterConfig.RuleOfCanData3) > 0) {
  199. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  200. Node: commonConfig.RosNode,
  201. Topic: topic,
  202. Callback: func(data *pjisuv_msgs.Frame) {
  203. subscribersTimeMutexes[i].Lock()
  204. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  205. subscribersMutexes[i].Lock()
  206. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  207. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  208. faultLabel := ""
  209. if len(masterConfig.RuleOfCanData1) > 0 {
  210. for _, f := range masterConfig.RuleOfCanData1 {
  211. faultLabel = f(data)
  212. if faultLabel != "" {
  213. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  214. subscribersTimes[i] = time.Now()
  215. goto TriggerSuccess
  216. }
  217. }
  218. }
  219. if len(masterConfig.RuleOfCanData3) > 0 {
  220. for _, f := range masterConfig.RuleOfCanData3 {
  221. faultLabel = f(shareVars, data)
  222. if faultLabel != "" {
  223. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  224. subscribersTimes[i] = time.Now()
  225. goto TriggerSuccess
  226. }
  227. }
  228. }
  229. TriggerSuccess:
  230. subscribersMutexes[i].Unlock()
  231. }
  232. subscribersTimeMutexes[i].Unlock()
  233. },
  234. })
  235. }
  236. // 5
  237. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud &&
  238. (len(masterConfig.RuleOfCh128x1LslidarPointCloud1) > 0 ||
  239. len(masterConfig.RuleOfCh128x1LslidarPointCloud3) > 0) {
  240. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  241. Node: commonConfig.RosNode,
  242. Topic: topic,
  243. Callback: func(data *sensor_msgs.PointCloud2) {
  244. subscribersTimeMutexes[i].Lock()
  245. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  246. subscribersMutexes[i].Lock()
  247. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  248. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  249. faultLabel := ""
  250. if len(masterConfig.RuleOfCh128x1LslidarPointCloud1) > 0 {
  251. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud1 {
  252. faultLabel = f(data)
  253. if faultLabel != "" {
  254. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  255. subscribersTimes[i] = time.Now()
  256. goto TriggerSuccess
  257. }
  258. }
  259. }
  260. if len(masterConfig.RuleOfCh128x1LslidarPointCloud3) > 0 {
  261. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud3 {
  262. faultLabel = f(shareVars, data)
  263. if faultLabel != "" {
  264. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  265. subscribersTimes[i] = time.Now()
  266. goto TriggerSuccess
  267. }
  268. }
  269. }
  270. TriggerSuccess:
  271. subscribersMutexes[i].Unlock()
  272. }
  273. subscribersTimeMutexes[i].Unlock()
  274. },
  275. })
  276. }
  277. // 6
  278. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud &&
  279. (len(masterConfig.RuleOfCh64wLLslidarPointCloud1) > 0 ||
  280. len(masterConfig.RuleOfCh64wLLslidarPointCloud3) > 0) {
  281. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  282. Node: commonConfig.RosNode,
  283. Topic: topic,
  284. Callback: func(data *sensor_msgs.PointCloud2) {
  285. subscribersTimeMutexes[i].Lock()
  286. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  287. subscribersMutexes[i].Lock()
  288. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  289. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  290. faultLabel := ""
  291. if len(masterConfig.RuleOfCh64wLLslidarPointCloud1) > 0 {
  292. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud1 {
  293. faultLabel = f(data)
  294. if faultLabel != "" {
  295. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  296. subscribersTimes[i] = time.Now()
  297. goto TriggerSuccess
  298. }
  299. }
  300. }
  301. if len(masterConfig.RuleOfCh64wLLslidarPointCloud3) > 0 {
  302. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud3 {
  303. faultLabel = f(shareVars, data)
  304. if faultLabel != "" {
  305. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  306. subscribersTimes[i] = time.Now()
  307. goto TriggerSuccess
  308. }
  309. }
  310. }
  311. TriggerSuccess:
  312. subscribersMutexes[i].Unlock()
  313. }
  314. subscribersTimeMutexes[i].Unlock()
  315. },
  316. })
  317. }
  318. // 7
  319. if topic == masterConfig.TopicOfCh64wLScan &&
  320. (len(masterConfig.RuleOfCh64wLScan1) > 0 ||
  321. len(masterConfig.RuleOfCh64wLScan3) > 0) {
  322. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  323. Node: commonConfig.RosNode,
  324. Topic: topic,
  325. Callback: func(data *sensor_msgs.LaserScan) {
  326. subscribersTimeMutexes[i].Lock()
  327. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  328. subscribersMutexes[i].Lock()
  329. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  330. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  331. faultLabel := ""
  332. if len(masterConfig.RuleOfCh64wLScan1) > 0 {
  333. for _, f := range masterConfig.RuleOfCh64wLScan1 {
  334. faultLabel = f(data)
  335. if faultLabel != "" {
  336. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  337. subscribersTimes[i] = time.Now()
  338. goto TriggerSuccess
  339. }
  340. }
  341. }
  342. if len(masterConfig.RuleOfCh64wLScan3) > 0 {
  343. for _, f := range masterConfig.RuleOfCh64wLScan3 {
  344. faultLabel = f(shareVars, data)
  345. if faultLabel != "" {
  346. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  347. subscribersTimes[i] = time.Now()
  348. goto TriggerSuccess
  349. }
  350. }
  351. }
  352. TriggerSuccess:
  353. subscribersMutexes[i].Unlock()
  354. }
  355. subscribersTimeMutexes[i].Unlock()
  356. },
  357. })
  358. }
  359. // 8
  360. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud &&
  361. (len(masterConfig.RuleOfCh64wRLslidarPointCloud1) > 0 ||
  362. len(masterConfig.RuleOfCh64wRLslidarPointCloud3) > 0) {
  363. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  364. Node: commonConfig.RosNode,
  365. Topic: topic,
  366. Callback: func(data *sensor_msgs.PointCloud2) {
  367. subscribersTimeMutexes[i].Lock()
  368. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  369. subscribersMutexes[i].Lock()
  370. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  371. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  372. faultLabel := ""
  373. if len(masterConfig.RuleOfCh64wRLslidarPointCloud1) > 0 {
  374. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud1 {
  375. faultLabel = f(data)
  376. if faultLabel != "" {
  377. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  378. subscribersTimes[i] = time.Now()
  379. goto TriggerSuccess
  380. }
  381. }
  382. }
  383. if len(masterConfig.RuleOfCh64wRLslidarPointCloud3) > 0 {
  384. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud3 {
  385. faultLabel = f(shareVars, data)
  386. if faultLabel != "" {
  387. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  388. subscribersTimes[i] = time.Now()
  389. goto TriggerSuccess
  390. }
  391. }
  392. }
  393. TriggerSuccess:
  394. subscribersMutexes[i].Unlock()
  395. }
  396. subscribersTimeMutexes[i].Unlock()
  397. },
  398. })
  399. }
  400. // 9
  401. if topic == masterConfig.TopicOfCh64wRScan &&
  402. (len(masterConfig.RuleOfCh64wRScan1) > 0 ||
  403. len(masterConfig.RuleOfCh64wRScan3) > 0) {
  404. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  405. Node: commonConfig.RosNode,
  406. Topic: topic,
  407. Callback: func(data *sensor_msgs.LaserScan) {
  408. subscribersTimeMutexes[i].Lock()
  409. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  410. subscribersMutexes[i].Lock()
  411. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  412. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  413. faultLabel := ""
  414. if len(masterConfig.RuleOfCh64wRScan1) > 0 {
  415. for _, f := range masterConfig.RuleOfCh64wRScan1 {
  416. faultLabel = f(data)
  417. if faultLabel != "" {
  418. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  419. subscribersTimes[i] = time.Now()
  420. goto TriggerSuccess
  421. }
  422. }
  423. }
  424. if len(masterConfig.RuleOfCh64wRScan3) > 0 {
  425. for _, f := range masterConfig.RuleOfCh64wRScan3 {
  426. faultLabel = f(shareVars, data)
  427. if faultLabel != "" {
  428. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  429. subscribersTimes[i] = time.Now()
  430. goto TriggerSuccess
  431. }
  432. }
  433. }
  434. TriggerSuccess:
  435. subscribersMutexes[i].Unlock()
  436. }
  437. subscribersTimeMutexes[i].Unlock()
  438. },
  439. })
  440. }
  441. // 10
  442. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects &&
  443. (len(masterConfig.RuleOfCicvLidarclusterMovingObjects1) > 0 ||
  444. len(masterConfig.RuleOfCicvLidarclusterMovingObjects3) > 0) {
  445. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  446. Node: commonConfig.RosNode,
  447. Topic: topic,
  448. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  449. subscribersTimeMutexes[i].Lock()
  450. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  451. subscribersMutexes[i].Lock()
  452. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  453. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  454. faultLabel := ""
  455. if len(masterConfig.RuleOfCicvLidarclusterMovingObjects1) > 0 {
  456. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects1 {
  457. faultLabel = f(data)
  458. if faultLabel != "" {
  459. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  460. subscribersTimes[i] = time.Now()
  461. goto TriggerSuccess
  462. }
  463. }
  464. }
  465. if len(masterConfig.RuleOfCicvLidarclusterMovingObjects3) > 0 {
  466. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects3 {
  467. faultLabel = f(shareVars, data)
  468. if faultLabel != "" {
  469. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  470. subscribersTimes[i] = time.Now()
  471. goto TriggerSuccess
  472. }
  473. }
  474. }
  475. TriggerSuccess:
  476. subscribersMutexes[i].Unlock()
  477. }
  478. subscribersTimeMutexes[i].Unlock()
  479. },
  480. })
  481. }
  482. // 11 有共享变量的订阅者必须被创建
  483. if topic == masterConfig.TopicOfCicvAmrTrajectory {
  484. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  485. Node: commonConfig.RosNode,
  486. Topic: topic,
  487. Callback: func(data *pjisuv_msgs.Trajectory) {
  488. subscribersTimeMutexes[i].Lock()
  489. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  490. subscribersMutexes[i].Lock()
  491. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  492. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  493. faultLabel := ""
  494. if len(masterConfig.RuleOfCicvAmrTrajectory1) > 0 {
  495. for _, f := range masterConfig.RuleOfCicvAmrTrajectory1 {
  496. faultLabel = f(data)
  497. if faultLabel != "" {
  498. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  499. subscribersTimes[i] = time.Now()
  500. goto TriggerSuccess
  501. }
  502. }
  503. }
  504. if len(masterConfig.RuleOfCicvAmrTrajectory3) > 0 {
  505. for _, f := range masterConfig.RuleOfCicvAmrTrajectory3 {
  506. faultLabel = f(shareVars, data)
  507. if faultLabel != "" {
  508. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  509. subscribersTimes[i] = time.Now()
  510. goto TriggerSuccess
  511. }
  512. }
  513. }
  514. TriggerSuccess:
  515. subscribersMutexes[i].Unlock()
  516. }
  517. subscribersTimeMutexes[i].Unlock()
  518. // 更新共享变量
  519. currentCurvateres := make([]float64, 0)
  520. for _, point := range data.Trajectoryinfo.Trajectorypoints {
  521. currentCurvateres = append(currentCurvateres, math.Abs(float64(point.Curvature)))
  522. }
  523. shareVars.Store("LastCurvaturesOfCicvAmrTrajectory", currentCurvateres)
  524. shareVars.Store("DecisionType", data.Trajectoryinfo.DecisionType)
  525. },
  526. })
  527. }
  528. // 12 有共享变量的订阅者必须被创建
  529. if topic == masterConfig.TopicOfCicvLocation {
  530. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  531. Node: commonConfig.RosNode,
  532. Topic: topic,
  533. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  534. subscribersTimeMutexes[i].Lock()
  535. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  536. subscribersMutexes[i].Lock()
  537. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  538. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  539. faultLabel := ""
  540. if len(masterConfig.RuleOfCicvLocation1) > 0 {
  541. for _, f := range masterConfig.RuleOfCicvLocation1 {
  542. faultLabel = f(data)
  543. if faultLabel != "" {
  544. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  545. subscribersTimes[i] = time.Now()
  546. goto TriggerSuccess
  547. }
  548. }
  549. }
  550. if len(masterConfig.RuleOfCicvLocation3) > 0 {
  551. for _, f := range masterConfig.RuleOfCicvLocation3 {
  552. faultLabel = f(shareVars, data)
  553. if faultLabel != "" {
  554. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  555. subscribersTimes[i] = time.Now()
  556. goto TriggerSuccess
  557. }
  558. }
  559. }
  560. TriggerSuccess:
  561. subscribersMutexes[i].Unlock()
  562. }
  563. subscribersTimeMutexes[i].Unlock()
  564. // 更新共享变量
  565. shareVars.Store("VelocityXOfCicvLocation", data.VelocityX)
  566. shareVars.Store("VelocityYOfCicvLocation", data.VelocityY)
  567. shareVars.Store("VelocityZOfCicvLocation", data.VelocityZ)
  568. shareVars.Store("YawOfCicvLocation", data.Yaw)
  569. shareVars.Store("AngularVelocityZOfCicvLocation", data.AngularVelocityZ)
  570. shareVars.Store("PositionXOfCicvLocation", data.PositionX)
  571. shareVars.Store("PositionYOfCicvLocation", data.PositionY)
  572. },
  573. })
  574. }
  575. // 13
  576. if topic == masterConfig.TopicOfCloudClusters &&
  577. (len(masterConfig.RuleOfCloudClusters1) > 0 ||
  578. len(masterConfig.RuleOfCloudClusters3) > 0) {
  579. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  580. Node: commonConfig.RosNode,
  581. Topic: topic,
  582. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  583. subscribersTimeMutexes[i].Lock()
  584. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  585. subscribersMutexes[i].Lock()
  586. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  587. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  588. faultLabel := ""
  589. if len(masterConfig.RuleOfCloudClusters1) > 0 {
  590. for _, f := range masterConfig.RuleOfCloudClusters1 {
  591. faultLabel = f(data)
  592. if faultLabel != "" {
  593. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  594. subscribersTimes[i] = time.Now()
  595. goto TriggerSuccess
  596. }
  597. }
  598. }
  599. if len(masterConfig.RuleOfCloudClusters3) > 0 {
  600. for _, f := range masterConfig.RuleOfCloudClusters3 {
  601. faultLabel = f(shareVars, data)
  602. if faultLabel != "" {
  603. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  604. subscribersTimes[i] = time.Now()
  605. goto TriggerSuccess
  606. }
  607. }
  608. }
  609. TriggerSuccess:
  610. subscribersMutexes[i].Unlock()
  611. }
  612. subscribersTimeMutexes[i].Unlock()
  613. },
  614. })
  615. }
  616. // 14
  617. if topic == masterConfig.TopicOfHeartbeatInfo &&
  618. (len(masterConfig.RuleOfHeartbeatInfo1) > 0 ||
  619. len(masterConfig.RuleOfHeartbeatInfo3) > 0) {
  620. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  621. Node: commonConfig.RosNode,
  622. Topic: topic,
  623. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  624. subscribersTimeMutexes[i].Lock()
  625. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  626. subscribersMutexes[i].Lock()
  627. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  628. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  629. faultLabel := ""
  630. if len(masterConfig.RuleOfHeartbeatInfo1) > 0 {
  631. for _, f := range masterConfig.RuleOfHeartbeatInfo1 {
  632. faultLabel = f(data)
  633. if faultLabel != "" {
  634. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  635. subscribersTimes[i] = time.Now()
  636. goto TriggerSuccess
  637. }
  638. }
  639. }
  640. if len(masterConfig.RuleOfHeartbeatInfo3) > 0 {
  641. for _, f := range masterConfig.RuleOfHeartbeatInfo3 {
  642. faultLabel = f(shareVars, data)
  643. if faultLabel != "" {
  644. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  645. subscribersTimes[i] = time.Now()
  646. goto TriggerSuccess
  647. }
  648. }
  649. }
  650. TriggerSuccess:
  651. subscribersMutexes[i].Unlock()
  652. }
  653. subscribersTimeMutexes[i].Unlock()
  654. },
  655. })
  656. }
  657. // 15
  658. if topic == masterConfig.TopicOfLidarPretreatmentCost &&
  659. (len(masterConfig.RuleOfLidarPretreatmentCost1) > 0 ||
  660. len(masterConfig.RuleOfLidarPretreatmentCost3) > 0) {
  661. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  662. Node: commonConfig.RosNode,
  663. Topic: topic,
  664. Callback: func(data *geometry_msgs.Vector3Stamped) {
  665. subscribersTimeMutexes[i].Lock()
  666. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  667. subscribersMutexes[i].Lock()
  668. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  669. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  670. faultLabel := ""
  671. if len(masterConfig.RuleOfLidarPretreatmentCost1) > 0 {
  672. for _, f := range masterConfig.RuleOfLidarPretreatmentCost1 {
  673. faultLabel = f(data)
  674. if faultLabel != "" {
  675. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  676. subscribersTimes[i] = time.Now()
  677. goto TriggerSuccess
  678. }
  679. }
  680. }
  681. if len(masterConfig.RuleOfLidarPretreatmentCost3) > 0 {
  682. for _, f := range masterConfig.RuleOfLidarPretreatmentCost3 {
  683. faultLabel = f(shareVars, data)
  684. if faultLabel != "" {
  685. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  686. subscribersTimes[i] = time.Now()
  687. goto TriggerSuccess
  688. }
  689. }
  690. }
  691. TriggerSuccess:
  692. subscribersMutexes[i].Unlock()
  693. }
  694. subscribersTimeMutexes[i].Unlock()
  695. },
  696. })
  697. }
  698. // 16
  699. if topic == masterConfig.TopicOfLidarPretreatmentOdometry &&
  700. (len(masterConfig.RuleOfLidarPretreatmentOdometry1) > 0 ||
  701. len(masterConfig.RuleOfLidarPretreatmentOdometry3) > 0) {
  702. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  703. Node: commonConfig.RosNode,
  704. Topic: topic,
  705. Callback: func(data *nav_msgs.Odometry) {
  706. subscribersTimeMutexes[i].Lock()
  707. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  708. subscribersMutexes[i].Lock()
  709. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  710. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  711. faultLabel := ""
  712. if len(masterConfig.RuleOfLidarPretreatmentOdometry1) > 0 {
  713. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry1 {
  714. faultLabel = f(data)
  715. if faultLabel != "" {
  716. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  717. subscribersTimes[i] = time.Now()
  718. goto TriggerSuccess
  719. }
  720. }
  721. }
  722. if len(masterConfig.RuleOfLidarPretreatmentOdometry3) > 0 {
  723. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry3 {
  724. faultLabel = f(shareVars, data)
  725. if faultLabel != "" {
  726. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  727. subscribersTimes[i] = time.Now()
  728. goto TriggerSuccess
  729. }
  730. }
  731. }
  732. TriggerSuccess:
  733. subscribersMutexes[i].Unlock()
  734. }
  735. subscribersTimeMutexes[i].Unlock()
  736. },
  737. })
  738. }
  739. // 17
  740. if topic == masterConfig.TopicOfLidarRoi &&
  741. (len(masterConfig.RuleOfLidarRoi1) > 0 ||
  742. len(masterConfig.RuleOfLidarRoi3) > 0) {
  743. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  744. Node: commonConfig.RosNode,
  745. Topic: topic,
  746. Callback: func(data *geometry_msgs.PolygonStamped) {
  747. subscribersTimeMutexes[i].Lock()
  748. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  749. subscribersMutexes[i].Lock()
  750. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  751. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  752. faultLabel := ""
  753. if len(masterConfig.RuleOfLidarRoi1) > 0 {
  754. for _, f := range masterConfig.RuleOfLidarRoi1 {
  755. faultLabel = f(data)
  756. if faultLabel != "" {
  757. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  758. subscribersTimes[i] = time.Now()
  759. goto TriggerSuccess
  760. }
  761. }
  762. }
  763. if len(masterConfig.RuleOfLidarRoi3) > 0 {
  764. for _, f := range masterConfig.RuleOfLidarRoi3 {
  765. faultLabel = f(shareVars, data)
  766. if faultLabel != "" {
  767. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  768. subscribersTimes[i] = time.Now()
  769. goto TriggerSuccess
  770. }
  771. }
  772. }
  773. TriggerSuccess:
  774. subscribersMutexes[i].Unlock()
  775. }
  776. subscribersTimeMutexes[i].Unlock()
  777. },
  778. })
  779. }
  780. // 18
  781. if topic == masterConfig.TopicOfLine1 &&
  782. (len(masterConfig.RuleOfLine11) > 0 ||
  783. len(masterConfig.RuleOfLine13) > 0) {
  784. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  785. Node: commonConfig.RosNode,
  786. Topic: topic,
  787. Callback: func(data *nav_msgs.Path) {
  788. subscribersTimeMutexes[i].Lock()
  789. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  790. subscribersMutexes[i].Lock()
  791. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  792. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  793. faultLabel := ""
  794. if len(masterConfig.RuleOfLine11) > 0 {
  795. for _, f := range masterConfig.RuleOfLine11 {
  796. faultLabel = f(data)
  797. if faultLabel != "" {
  798. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  799. subscribersTimes[i] = time.Now()
  800. goto TriggerSuccess
  801. }
  802. }
  803. }
  804. if len(masterConfig.RuleOfLine13) > 0 {
  805. for _, f := range masterConfig.RuleOfLine13 {
  806. faultLabel = f(shareVars, data)
  807. if faultLabel != "" {
  808. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  809. subscribersTimes[i] = time.Now()
  810. goto TriggerSuccess
  811. }
  812. }
  813. }
  814. TriggerSuccess:
  815. subscribersMutexes[i].Unlock()
  816. }
  817. subscribersTimeMutexes[i].Unlock()
  818. },
  819. })
  820. }
  821. // 19
  822. if topic == masterConfig.TopicOfLine2 &&
  823. (len(masterConfig.RuleOfLine21) > 0 ||
  824. len(masterConfig.RuleOfLine23) > 0) {
  825. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  826. Node: commonConfig.RosNode,
  827. Topic: topic,
  828. Callback: func(data *nav_msgs.Path) {
  829. subscribersTimeMutexes[i].Lock()
  830. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  831. subscribersMutexes[i].Lock()
  832. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  833. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  834. faultLabel := ""
  835. if len(masterConfig.RuleOfLine21) > 0 {
  836. for _, f := range masterConfig.RuleOfLine21 {
  837. faultLabel = f(data)
  838. if faultLabel != "" {
  839. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  840. subscribersTimes[i] = time.Now()
  841. goto TriggerSuccess
  842. }
  843. }
  844. }
  845. if len(masterConfig.RuleOfLine23) > 0 {
  846. for _, f := range masterConfig.RuleOfLine23 {
  847. faultLabel = f(shareVars, data)
  848. if faultLabel != "" {
  849. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  850. subscribersTimes[i] = time.Now()
  851. goto TriggerSuccess
  852. }
  853. }
  854. }
  855. TriggerSuccess:
  856. subscribersMutexes[i].Unlock()
  857. }
  858. subscribersTimeMutexes[i].Unlock()
  859. },
  860. })
  861. }
  862. // 20
  863. if topic == masterConfig.TopicOfMapPolygon &&
  864. (len(masterConfig.RuleOfMapPolygon1) > 0 ||
  865. len(masterConfig.RuleOfMapPolygon3) > 0) {
  866. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  867. Node: commonConfig.RosNode,
  868. Topic: topic,
  869. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  870. subscribersTimeMutexes[i].Lock()
  871. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  872. subscribersMutexes[i].Lock()
  873. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  874. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  875. faultLabel := ""
  876. if len(masterConfig.RuleOfMapPolygon1) > 0 {
  877. for _, f := range masterConfig.RuleOfMapPolygon1 {
  878. faultLabel = f(data)
  879. if faultLabel != "" {
  880. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  881. subscribersTimes[i] = time.Now()
  882. goto TriggerSuccess
  883. }
  884. }
  885. }
  886. if len(masterConfig.RuleOfMapPolygon3) > 0 {
  887. for _, f := range masterConfig.RuleOfMapPolygon3 {
  888. faultLabel = f(shareVars, data)
  889. if faultLabel != "" {
  890. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  891. subscribersTimes[i] = time.Now()
  892. goto TriggerSuccess
  893. }
  894. }
  895. }
  896. TriggerSuccess:
  897. subscribersMutexes[i].Unlock()
  898. }
  899. subscribersTimeMutexes[i].Unlock()
  900. },
  901. })
  902. }
  903. // 21
  904. if topic == masterConfig.TopicOfObstacleDisplay &&
  905. (len(masterConfig.RuleOfObstacleDisplay1) > 0 ||
  906. len(masterConfig.RuleOfObstacleDisplay3) > 0) {
  907. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  908. Node: commonConfig.RosNode,
  909. Topic: topic,
  910. Callback: func(data *visualization_msgs.MarkerArray) {
  911. subscribersTimeMutexes[i].Lock()
  912. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  913. subscribersMutexes[i].Lock()
  914. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  915. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  916. faultLabel := ""
  917. if len(masterConfig.RuleOfObstacleDisplay1) > 0 {
  918. for _, f := range masterConfig.RuleOfObstacleDisplay1 {
  919. faultLabel = f(data)
  920. if faultLabel != "" {
  921. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  922. subscribersTimes[i] = time.Now()
  923. goto TriggerSuccess
  924. }
  925. }
  926. }
  927. if len(masterConfig.RuleOfObstacleDisplay3) > 0 {
  928. for _, f := range masterConfig.RuleOfObstacleDisplay3 {
  929. faultLabel = f(shareVars, data)
  930. if faultLabel != "" {
  931. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  932. subscribersTimes[i] = time.Now()
  933. goto TriggerSuccess
  934. }
  935. }
  936. }
  937. TriggerSuccess:
  938. subscribersMutexes[i].Unlock()
  939. }
  940. subscribersTimeMutexes[i].Unlock()
  941. },
  942. })
  943. }
  944. // 22 有共享变量的订阅者必须被创建
  945. if topic == masterConfig.TopicOfPjControlPub {
  946. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  947. Node: commonConfig.RosNode,
  948. Topic: topic,
  949. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  950. subscribersTimeMutexes[i].Lock()
  951. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  952. subscribersMutexes[i].Lock()
  953. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  954. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  955. faultLabel := ""
  956. if len(masterConfig.RuleOfPjControlPub1) > 0 {
  957. for _, f := range masterConfig.RuleOfPjControlPub1 {
  958. faultLabel = f(data)
  959. if faultLabel != "" {
  960. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  961. subscribersTimes[i] = time.Now()
  962. goto TriggerSuccess
  963. }
  964. }
  965. }
  966. if len(masterConfig.RuleOfPjControlPub3) > 0 {
  967. for _, f := range masterConfig.RuleOfPjControlPub3 {
  968. faultLabel = f(shareVars, data)
  969. if faultLabel != "" {
  970. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  971. subscribersTimes[i] = time.Now()
  972. goto TriggerSuccess
  973. }
  974. }
  975. }
  976. TriggerSuccess:
  977. subscribersMutexes[i].Unlock()
  978. }
  979. subscribersTimeMutexes[i].Unlock()
  980. // 更新共享变量
  981. numCountPjiControlCommandOfPjControlPub++
  982. if numCountPjiControlCommandOfPjControlPub == 10 {
  983. egoSteeringCmdOfPjControlPub = append(egoSteeringCmdOfPjControlPub, data.ICPVCmdStrAngle)
  984. egoThrottleCmdOfPjControlPub = append(egoThrottleCmdOfPjControlPub, data.ICPVCmdAccPelPosAct)
  985. numCountPjiControlCommandOfPjControlPub = 0
  986. }
  987. shareVars.Store("NumCountPjiControlCommandOfPjControlPub", numCountPjiControlCommandOfPjControlPub)
  988. shareVars.Store("EgoSteeringCmdOfPjControlPub", egoSteeringCmdOfPjControlPub)
  989. shareVars.Store("EgoThrottleCmdOfPjControlPub", egoThrottleCmdOfPjControlPub)
  990. },
  991. })
  992. }
  993. // 23
  994. if topic == masterConfig.TopicOfPointsCluster &&
  995. (len(masterConfig.RuleOfPointsCluster1) > 0 ||
  996. len(masterConfig.RuleOfPointsCluster3) > 0) {
  997. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  998. Node: commonConfig.RosNode,
  999. Topic: topic,
  1000. Callback: func(data *sensor_msgs.PointCloud2) {
  1001. subscribersTimeMutexes[i].Lock()
  1002. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1003. subscribersMutexes[i].Lock()
  1004. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1005. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1006. faultLabel := ""
  1007. if len(masterConfig.RuleOfPointsCluster1) > 0 {
  1008. for _, f := range masterConfig.RuleOfPointsCluster1 {
  1009. faultLabel = f(data)
  1010. if faultLabel != "" {
  1011. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1012. subscribersTimes[i] = time.Now()
  1013. goto TriggerSuccess
  1014. }
  1015. }
  1016. }
  1017. if len(masterConfig.RuleOfPointsCluster3) > 0 {
  1018. for _, f := range masterConfig.RuleOfPointsCluster3 {
  1019. faultLabel = f(shareVars, data)
  1020. if faultLabel != "" {
  1021. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1022. subscribersTimes[i] = time.Now()
  1023. goto TriggerSuccess
  1024. }
  1025. }
  1026. }
  1027. TriggerSuccess:
  1028. subscribersMutexes[i].Unlock()
  1029. }
  1030. subscribersTimeMutexes[i].Unlock()
  1031. },
  1032. })
  1033. }
  1034. // 24
  1035. if topic == masterConfig.TopicOfPointsConcat &&
  1036. (len(masterConfig.RuleOfPointsConcat1) > 0 ||
  1037. len(masterConfig.RuleOfPointsConcat3) > 0) {
  1038. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1039. Node: commonConfig.RosNode,
  1040. Topic: topic,
  1041. Callback: func(data *sensor_msgs.PointCloud2) {
  1042. subscribersTimeMutexes[i].Lock()
  1043. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1044. subscribersMutexes[i].Lock()
  1045. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1046. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1047. faultLabel := ""
  1048. if len(masterConfig.RuleOfPointsConcat1) > 0 {
  1049. for _, f := range masterConfig.RuleOfPointsConcat1 {
  1050. faultLabel = f(data)
  1051. if faultLabel != "" {
  1052. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1053. subscribersTimes[i] = time.Now()
  1054. goto TriggerSuccess
  1055. }
  1056. }
  1057. }
  1058. if len(masterConfig.RuleOfPointsConcat3) > 0 {
  1059. for _, f := range masterConfig.RuleOfPointsConcat3 {
  1060. faultLabel = f(shareVars, data)
  1061. if faultLabel != "" {
  1062. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1063. subscribersTimes[i] = time.Now()
  1064. goto TriggerSuccess
  1065. }
  1066. }
  1067. }
  1068. TriggerSuccess:
  1069. subscribersMutexes[i].Unlock()
  1070. }
  1071. subscribersTimeMutexes[i].Unlock()
  1072. },
  1073. })
  1074. }
  1075. // 25
  1076. if topic == masterConfig.TopicOfReferenceDisplay &&
  1077. (len(masterConfig.RuleOfReferenceDisplay1) > 0 ||
  1078. len(masterConfig.RuleOfReferenceDisplay3) > 0) {
  1079. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1080. Node: commonConfig.RosNode,
  1081. Topic: topic,
  1082. Callback: func(data *nav_msgs.Path) {
  1083. subscribersTimeMutexes[i].Lock()
  1084. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1085. subscribersMutexes[i].Lock()
  1086. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1087. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1088. faultLabel := ""
  1089. if len(masterConfig.RuleOfReferenceDisplay1) > 0 {
  1090. for _, f := range masterConfig.RuleOfReferenceDisplay1 {
  1091. faultLabel = f(data)
  1092. if faultLabel != "" {
  1093. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1094. subscribersTimes[i] = time.Now()
  1095. goto TriggerSuccess
  1096. }
  1097. }
  1098. }
  1099. if len(masterConfig.RuleOfReferenceDisplay3) > 0 {
  1100. for _, f := range masterConfig.RuleOfReferenceDisplay3 {
  1101. faultLabel = f(shareVars, data)
  1102. if faultLabel != "" {
  1103. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1104. subscribersTimes[i] = time.Now()
  1105. goto TriggerSuccess
  1106. }
  1107. }
  1108. }
  1109. TriggerSuccess:
  1110. subscribersMutexes[i].Unlock()
  1111. }
  1112. subscribersTimeMutexes[i].Unlock()
  1113. },
  1114. })
  1115. }
  1116. // 26
  1117. if topic == masterConfig.TopicOfReferenceTrajectory &&
  1118. (len(masterConfig.RuleOfReferenceTrajectory1) > 0 ||
  1119. len(masterConfig.RuleOfReferenceTrajectory3) > 0) {
  1120. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1121. Node: commonConfig.RosNode,
  1122. Topic: topic,
  1123. Callback: func(data *pjisuv_msgs.Trajectory) {
  1124. subscribersTimeMutexes[i].Lock()
  1125. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1126. subscribersMutexes[i].Lock()
  1127. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1128. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1129. faultLabel := ""
  1130. if len(masterConfig.RuleOfReferenceTrajectory1) > 0 {
  1131. for _, f := range masterConfig.RuleOfReferenceTrajectory1 {
  1132. faultLabel = f(data)
  1133. if faultLabel != "" {
  1134. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1135. subscribersTimes[i] = time.Now()
  1136. goto TriggerSuccess
  1137. }
  1138. }
  1139. }
  1140. if len(masterConfig.RuleOfReferenceTrajectory3) > 0 {
  1141. for _, f := range masterConfig.RuleOfReferenceTrajectory3 {
  1142. faultLabel = f(shareVars, data)
  1143. if faultLabel != "" {
  1144. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1145. subscribersTimes[i] = time.Now()
  1146. goto TriggerSuccess
  1147. }
  1148. }
  1149. }
  1150. TriggerSuccess:
  1151. subscribersMutexes[i].Unlock()
  1152. }
  1153. subscribersTimeMutexes[i].Unlock()
  1154. },
  1155. })
  1156. }
  1157. // 27
  1158. if topic == masterConfig.TopicOfRoiPoints &&
  1159. (len(masterConfig.RuleOfRoiPoints1) > 0 ||
  1160. len(masterConfig.RuleOfRoiPoints3) > 0) {
  1161. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1162. Node: commonConfig.RosNode,
  1163. Topic: topic,
  1164. Callback: func(data *sensor_msgs.PointCloud2) {
  1165. subscribersTimeMutexes[i].Lock()
  1166. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1167. subscribersMutexes[i].Lock()
  1168. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1169. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1170. faultLabel := ""
  1171. if len(masterConfig.RuleOfRoiPoints1) > 0 {
  1172. for _, f := range masterConfig.RuleOfRoiPoints1 {
  1173. faultLabel = f(data)
  1174. if faultLabel != "" {
  1175. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1176. subscribersTimes[i] = time.Now()
  1177. goto TriggerSuccess
  1178. }
  1179. }
  1180. }
  1181. if len(masterConfig.RuleOfRoiPoints3) > 0 {
  1182. for _, f := range masterConfig.RuleOfRoiPoints3 {
  1183. faultLabel = f(shareVars, data)
  1184. if faultLabel != "" {
  1185. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1186. subscribersTimes[i] = time.Now()
  1187. goto TriggerSuccess
  1188. }
  1189. }
  1190. }
  1191. TriggerSuccess:
  1192. subscribersMutexes[i].Unlock()
  1193. }
  1194. subscribersTimeMutexes[i].Unlock()
  1195. },
  1196. })
  1197. }
  1198. // 28
  1199. if topic == masterConfig.TopicOfRoiPolygon &&
  1200. (len(masterConfig.RuleOfRoiPolygon1) > 0 ||
  1201. len(masterConfig.RuleOfRoiPolygon3) > 0) {
  1202. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1203. Node: commonConfig.RosNode,
  1204. Topic: topic,
  1205. Callback: func(data *nav_msgs.Path) {
  1206. subscribersTimeMutexes[i].Lock()
  1207. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1208. subscribersMutexes[i].Lock()
  1209. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1210. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1211. faultLabel := ""
  1212. if len(masterConfig.RuleOfRoiPolygon1) > 0 {
  1213. for _, f := range masterConfig.RuleOfRoiPolygon1 {
  1214. faultLabel = f(data)
  1215. if faultLabel != "" {
  1216. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1217. subscribersTimes[i] = time.Now()
  1218. goto TriggerSuccess
  1219. }
  1220. }
  1221. }
  1222. if len(masterConfig.RuleOfRoiPolygon3) > 0 {
  1223. for _, f := range masterConfig.RuleOfRoiPolygon3 {
  1224. faultLabel = f(shareVars, data)
  1225. if faultLabel != "" {
  1226. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1227. subscribersTimes[i] = time.Now()
  1228. goto TriggerSuccess
  1229. }
  1230. }
  1231. }
  1232. TriggerSuccess:
  1233. subscribersMutexes[i].Unlock()
  1234. }
  1235. subscribersTimeMutexes[i].Unlock()
  1236. },
  1237. })
  1238. }
  1239. // 29
  1240. if topic == masterConfig.TopicOfTf &&
  1241. (len(masterConfig.RuleOfTf1) > 0 ||
  1242. len(masterConfig.RuleOfTf3) > 0) {
  1243. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1244. Node: commonConfig.RosNode,
  1245. Topic: topic,
  1246. Callback: func(data *tf2_msgs.TFMessage) {
  1247. subscribersTimeMutexes[i].Lock()
  1248. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1249. subscribersMutexes[i].Lock()
  1250. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1251. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1252. faultLabel := ""
  1253. if len(masterConfig.RuleOfTf1) > 0 {
  1254. for _, f := range masterConfig.RuleOfTf1 {
  1255. faultLabel = f(data)
  1256. if faultLabel != "" {
  1257. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1258. subscribersTimes[i] = time.Now()
  1259. goto TriggerSuccess
  1260. }
  1261. }
  1262. }
  1263. if len(masterConfig.RuleOfTf3) > 0 {
  1264. for _, f := range masterConfig.RuleOfTf3 {
  1265. faultLabel = f(shareVars, data)
  1266. if faultLabel != "" {
  1267. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1268. subscribersTimes[i] = time.Now()
  1269. goto TriggerSuccess
  1270. }
  1271. }
  1272. }
  1273. TriggerSuccess:
  1274. subscribersMutexes[i].Unlock()
  1275. }
  1276. subscribersTimeMutexes[i].Unlock()
  1277. },
  1278. })
  1279. }
  1280. // 30 有共享变量的订阅者必须被创建
  1281. if topic == masterConfig.TopicOfTpperception {
  1282. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1283. Node: commonConfig.RosNode,
  1284. Topic: topic,
  1285. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  1286. subscribersTimeMutexes[i].Lock()
  1287. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1288. subscribersMutexes[i].Lock()
  1289. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1290. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1291. faultLabel := ""
  1292. if len(masterConfig.RuleOfTpperception1) > 0 {
  1293. for _, f := range masterConfig.RuleOfTpperception1 {
  1294. faultLabel = f(data)
  1295. if faultLabel != "" {
  1296. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1297. subscribersTimes[i] = time.Now()
  1298. goto TriggerSuccess
  1299. }
  1300. }
  1301. }
  1302. if len(masterConfig.RuleOfTpperception3) > 0 {
  1303. for _, f := range masterConfig.RuleOfTpperception3 {
  1304. faultLabel = f(shareVars, data)
  1305. if faultLabel != "" {
  1306. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1307. subscribersTimes[i] = time.Now()
  1308. goto TriggerSuccess
  1309. }
  1310. }
  1311. }
  1312. TriggerSuccess:
  1313. subscribersMutexes[i].Unlock()
  1314. }
  1315. subscribersTimeMutexes[i].Unlock()
  1316. // 更新共享变量
  1317. for _, obj := range data.Objs {
  1318. if obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  1319. continue
  1320. }
  1321. if _, ok := objDicOfTpperception[obj.Id]; !ok {
  1322. objDicOfTpperception[obj.Id] = []float32{}
  1323. }
  1324. objDicOfTpperception[obj.Id] = append(objDicOfTpperception[obj.Id], obj.Y)
  1325. objTypeDicOfTpperception[obj.Id] = obj.Type
  1326. objSpeedDicOfTpperception[obj.Id] = math.Pow(math.Pow(float64(obj.Vxabs), 2)+math.Pow(float64(obj.Vyabs), 2), 0.5)
  1327. }
  1328. shareVars.Store("ObjDicOfTpperception", objDicOfTpperception)
  1329. shareVars.Store("ObjTypeDicOfTpperception", objTypeDicOfTpperception)
  1330. shareVars.Store("ObjSpeedDicOfTpperception", objSpeedDicOfTpperception)
  1331. },
  1332. })
  1333. }
  1334. // 31
  1335. if topic == masterConfig.TopicOfTpperceptionVis &&
  1336. (len(masterConfig.RuleOfTpperceptionVis1) > 0 ||
  1337. len(masterConfig.RuleOfTpperceptionVis3) > 0) {
  1338. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1339. Node: commonConfig.RosNode,
  1340. Topic: topic,
  1341. Callback: func(data *visualization_msgs.MarkerArray) {
  1342. subscribersTimeMutexes[i].Lock()
  1343. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1344. subscribersMutexes[i].Lock()
  1345. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1346. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1347. faultLabel := ""
  1348. if len(masterConfig.RuleOfTpperceptionVis1) > 0 {
  1349. for _, f := range masterConfig.RuleOfTpperceptionVis1 {
  1350. faultLabel = f(data)
  1351. if faultLabel != "" {
  1352. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1353. subscribersTimes[i] = time.Now()
  1354. goto TriggerSuccess
  1355. }
  1356. }
  1357. }
  1358. if len(masterConfig.RuleOfTpperceptionVis3) > 0 {
  1359. for _, f := range masterConfig.RuleOfTpperceptionVis3 {
  1360. faultLabel = f(shareVars, data)
  1361. if faultLabel != "" {
  1362. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1363. subscribersTimes[i] = time.Now()
  1364. goto TriggerSuccess
  1365. }
  1366. }
  1367. }
  1368. TriggerSuccess:
  1369. subscribersMutexes[i].Unlock()
  1370. }
  1371. subscribersTimeMutexes[i].Unlock()
  1372. },
  1373. })
  1374. }
  1375. // 32
  1376. if topic == masterConfig.TopicOfTprouteplan &&
  1377. (len(masterConfig.RuleOfTprouteplan1) > 0 ||
  1378. len(masterConfig.RuleOfTprouteplan3) > 0) {
  1379. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1380. Node: commonConfig.RosNode,
  1381. Topic: topic,
  1382. Callback: func(data *pjisuv_msgs.RoutePlan) {
  1383. subscribersTimeMutexes[i].Lock()
  1384. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1385. subscribersMutexes[i].Lock()
  1386. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1387. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1388. faultLabel := ""
  1389. if len(masterConfig.RuleOfTprouteplan1) > 0 {
  1390. for _, f := range masterConfig.RuleOfTprouteplan1 {
  1391. faultLabel = f(data)
  1392. if faultLabel != "" {
  1393. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1394. subscribersTimes[i] = time.Now()
  1395. goto TriggerSuccess
  1396. }
  1397. }
  1398. }
  1399. if len(masterConfig.RuleOfTprouteplan3) > 0 {
  1400. for _, f := range masterConfig.RuleOfTprouteplan3 {
  1401. faultLabel = f(shareVars, data)
  1402. if faultLabel != "" {
  1403. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1404. subscribersTimes[i] = time.Now()
  1405. goto TriggerSuccess
  1406. }
  1407. }
  1408. }
  1409. TriggerSuccess:
  1410. subscribersMutexes[i].Unlock()
  1411. }
  1412. subscribersTimeMutexes[i].Unlock()
  1413. },
  1414. })
  1415. }
  1416. // 33
  1417. if topic == masterConfig.TopicOfTrajectoryDisplay &&
  1418. (len(masterConfig.RuleOfTrajectoryDisplay1) > 0 ||
  1419. len(masterConfig.RuleOfTrajectoryDisplay3) > 0) {
  1420. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1421. Node: commonConfig.RosNode,
  1422. Topic: topic,
  1423. Callback: func(data *nav_msgs.Path) {
  1424. subscribersTimeMutexes[i].Lock()
  1425. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1426. subscribersMutexes[i].Lock()
  1427. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1428. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1429. faultLabel := ""
  1430. if len(masterConfig.RuleOfTrajectoryDisplay1) > 0 {
  1431. for _, f := range masterConfig.RuleOfTrajectoryDisplay1 {
  1432. faultLabel = f(data)
  1433. if faultLabel != "" {
  1434. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1435. subscribersTimes[i] = time.Now()
  1436. goto TriggerSuccess
  1437. }
  1438. }
  1439. }
  1440. if len(masterConfig.RuleOfTrajectoryDisplay3) > 0 {
  1441. for _, f := range masterConfig.RuleOfTrajectoryDisplay3 {
  1442. faultLabel = f(shareVars, data)
  1443. if faultLabel != "" {
  1444. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1445. subscribersTimes[i] = time.Now()
  1446. goto TriggerSuccess
  1447. }
  1448. }
  1449. }
  1450. TriggerSuccess:
  1451. subscribersMutexes[i].Unlock()
  1452. }
  1453. subscribersTimeMutexes[i].Unlock()
  1454. },
  1455. })
  1456. }
  1457. // 34
  1458. if topic == masterConfig.TopicOfUngroundCloudpoints &&
  1459. (len(masterConfig.RuleOfUngroundCloudpoints1) > 0 ||
  1460. len(masterConfig.RuleOfUngroundCloudpoints3) > 0) {
  1461. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1462. Node: commonConfig.RosNode,
  1463. Topic: topic,
  1464. Callback: func(data *sensor_msgs.PointCloud2) {
  1465. subscribersTimeMutexes[i].Lock()
  1466. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1467. subscribersMutexes[i].Lock()
  1468. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1469. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1470. faultLabel := ""
  1471. if len(masterConfig.RuleOfUngroundCloudpoints1) > 0 {
  1472. for _, f := range masterConfig.RuleOfUngroundCloudpoints1 {
  1473. faultLabel = f(data)
  1474. if faultLabel != "" {
  1475. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1476. subscribersTimes[i] = time.Now()
  1477. goto TriggerSuccess
  1478. }
  1479. }
  1480. }
  1481. if len(masterConfig.RuleOfUngroundCloudpoints3) > 0 {
  1482. for _, f := range masterConfig.RuleOfUngroundCloudpoints3 {
  1483. faultLabel = f(shareVars, data)
  1484. if faultLabel != "" {
  1485. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1486. subscribersTimes[i] = time.Now()
  1487. goto TriggerSuccess
  1488. }
  1489. }
  1490. }
  1491. TriggerSuccess:
  1492. subscribersMutexes[i].Unlock()
  1493. }
  1494. subscribersTimeMutexes[i].Unlock()
  1495. },
  1496. })
  1497. }
  1498. // 35
  1499. if topic == masterConfig.TopicOfCameraImage &&
  1500. (len(masterConfig.RuleOfCameraImage1) > 0 ||
  1501. len(masterConfig.RuleOfCameraImage3) > 0) {
  1502. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1503. Node: commonConfig.RosNode,
  1504. Topic: topic,
  1505. Callback: func(data *sensor_msgs.Image) {
  1506. subscribersTimeMutexes[i].Lock()
  1507. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1508. subscribersMutexes[i].Lock()
  1509. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1510. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1511. faultLabel := ""
  1512. if len(masterConfig.RuleOfCameraImage1) > 0 {
  1513. for _, f := range masterConfig.RuleOfCameraImage1 {
  1514. faultLabel = f(data)
  1515. if faultLabel != "" {
  1516. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1517. subscribersTimes[i] = time.Now()
  1518. goto TriggerSuccess
  1519. }
  1520. }
  1521. }
  1522. if len(masterConfig.RuleOfCameraImage3) > 0 {
  1523. for _, f := range masterConfig.RuleOfCameraImage3 {
  1524. faultLabel = f(shareVars, data)
  1525. if faultLabel != "" {
  1526. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1527. subscribersTimes[i] = time.Now()
  1528. goto TriggerSuccess
  1529. }
  1530. }
  1531. }
  1532. TriggerSuccess:
  1533. subscribersMutexes[i].Unlock()
  1534. }
  1535. subscribersTimeMutexes[i].Unlock()
  1536. },
  1537. })
  1538. }
  1539. // 36 有共享变量的订阅者必须被创建
  1540. if topic == masterConfig.TopicOfDataRead {
  1541. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1542. Node: commonConfig.RosNode,
  1543. Topic: topic,
  1544. Callback: func(data *pjisuv_msgs.Retrieval) {
  1545. subscribersTimeMutexes[i].Lock()
  1546. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1547. subscribersMutexes[i].Lock()
  1548. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1549. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1550. faultLabel := ""
  1551. if len(masterConfig.RuleOfDataRead1) > 0 {
  1552. for _, f := range masterConfig.RuleOfDataRead1 {
  1553. faultLabel = f(data)
  1554. if faultLabel != "" {
  1555. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1556. subscribersTimes[i] = time.Now()
  1557. goto TriggerSuccess
  1558. }
  1559. }
  1560. }
  1561. if len(masterConfig.RuleOfDataRead3) > 0 {
  1562. for _, f := range masterConfig.RuleOfDataRead3 {
  1563. faultLabel = f(shareVars, data)
  1564. if faultLabel != "" {
  1565. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1566. subscribersTimes[i] = time.Now()
  1567. goto TriggerSuccess
  1568. }
  1569. }
  1570. }
  1571. TriggerSuccess:
  1572. subscribersMutexes[i].Unlock()
  1573. }
  1574. subscribersTimeMutexes[i].Unlock()
  1575. // 更新共享变量
  1576. numCountDataReadOfDataRead++
  1577. if numCountDataReadOfDataRead == 10 {
  1578. egoSteeringRealOfDataRead = append(egoSteeringRealOfDataRead, data.ActStrWhAng)
  1579. egoThrottleRealOfDataRead = append(egoThrottleRealOfDataRead, data.AccPed2)
  1580. numCountDataReadOfDataRead = 0
  1581. }
  1582. shareVars.Store("NumCountDataReadOfDataRead", numCountDataReadOfDataRead)
  1583. shareVars.Store("EgoSteeringRealOfDataRead", egoSteeringRealOfDataRead)
  1584. shareVars.Store("EgoThrottleRealOfDataRead", egoThrottleRealOfDataRead)
  1585. shareVars.Store("ActStrWhAngOfDataRead", data.ActStrWhAng)
  1586. },
  1587. })
  1588. }
  1589. // 37
  1590. if topic == masterConfig.TopicOfPjiGps &&
  1591. (len(masterConfig.RuleOfPjiGps1) > 0 ||
  1592. len(masterConfig.RuleOfPjiGps3) > 0) {
  1593. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1594. Node: commonConfig.RosNode,
  1595. Topic: topic,
  1596. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  1597. subscribersTimeMutexes[i].Lock()
  1598. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1599. subscribersMutexes[i].Lock()
  1600. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1601. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1602. faultLabel := ""
  1603. if len(masterConfig.RuleOfPjiGps1) > 0 {
  1604. for _, f := range masterConfig.RuleOfPjiGps1 {
  1605. faultLabel = f(data)
  1606. if faultLabel != "" {
  1607. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1608. subscribersTimes[i] = time.Now()
  1609. goto TriggerSuccess
  1610. }
  1611. }
  1612. }
  1613. if len(masterConfig.RuleOfPjiGps3) > 0 {
  1614. for _, f := range masterConfig.RuleOfPjiGps3 {
  1615. faultLabel = f(shareVars, data)
  1616. if faultLabel != "" {
  1617. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1618. subscribersTimes[i] = time.Now()
  1619. goto TriggerSuccess
  1620. }
  1621. }
  1622. }
  1623. TriggerSuccess:
  1624. subscribersMutexes[i].Unlock()
  1625. }
  1626. subscribersTimeMutexes[i].Unlock()
  1627. },
  1628. })
  1629. }
  1630. // 39 有共享变量的订阅者必须被创建
  1631. if topic == masterConfig.TopicOfPjVehicleFdbPub {
  1632. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1633. Node: commonConfig.RosNode,
  1634. Topic: topic,
  1635. Callback: func(data *pjisuv_msgs.VehicleFdb) {
  1636. subscribersTimeMutexes[i].Lock()
  1637. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1638. subscribersMutexes[i].Lock()
  1639. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1640. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1641. faultLabel := ""
  1642. if len(masterConfig.RuleOfPjVehicleFdbPub1) > 0 {
  1643. for _, f := range masterConfig.RuleOfPjVehicleFdbPub1 {
  1644. faultLabel = f(data)
  1645. if faultLabel != "" {
  1646. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1647. subscribersTimes[i] = time.Now()
  1648. goto TriggerSuccess
  1649. }
  1650. }
  1651. }
  1652. if len(masterConfig.RuleOfPjVehicleFdbPub3) > 0 {
  1653. for _, f := range masterConfig.RuleOfPjVehicleFdbPub3 {
  1654. faultLabel = f(shareVars, data)
  1655. if faultLabel != "" {
  1656. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1657. subscribersTimes[i] = time.Now()
  1658. goto TriggerSuccess
  1659. }
  1660. }
  1661. }
  1662. TriggerSuccess:
  1663. subscribersMutexes[i].Unlock()
  1664. }
  1665. subscribersTimeMutexes[i].Unlock()
  1666. // 更新共享变量
  1667. shareVars.Store("AutomodeOfPjVehicleFdbPub", data.Automode)
  1668. },
  1669. })
  1670. }
  1671. // 40 有共享变量的订阅者必须被创建
  1672. if topic == masterConfig.TopicOfEndPointMessage {
  1673. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1674. Node: commonConfig.RosNode,
  1675. Topic: topic,
  1676. Callback: func(data *geometry_msgs.Point) {
  1677. subscribersTimeMutexes[i].Lock()
  1678. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1679. subscribersMutexes[i].Lock()
  1680. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1681. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1682. faultLabel := ""
  1683. if len(masterConfig.RuleOfEndPointMessage1) > 0 {
  1684. for _, f := range masterConfig.RuleOfEndPointMessage1 {
  1685. faultLabel = f(data)
  1686. if faultLabel != "" {
  1687. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1688. subscribersTimes[i] = time.Now()
  1689. goto TriggerSuccess
  1690. }
  1691. }
  1692. }
  1693. if len(masterConfig.RuleOfEndPointMessage3) > 0 {
  1694. for _, f := range masterConfig.RuleOfEndPointMessage3 {
  1695. faultLabel = f(shareVars, data)
  1696. if faultLabel != "" {
  1697. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1698. subscribersTimes[i] = time.Now()
  1699. goto TriggerSuccess
  1700. }
  1701. }
  1702. }
  1703. TriggerSuccess:
  1704. subscribersMutexes[i].Unlock()
  1705. }
  1706. subscribersTimeMutexes[i].Unlock()
  1707. // 更新共享变量
  1708. shareVars.Store("EndPointX", data.X)
  1709. shareVars.Store("EndPointY", data.Y)
  1710. },
  1711. })
  1712. }
  1713. if err != nil {
  1714. c_log.GlobalLogger.Info("创建订阅者报错,可能由于节点未启动,再次尝试:", err)
  1715. time.Sleep(time.Duration(2) * time.Second)
  1716. continue
  1717. } else {
  1718. break
  1719. }
  1720. }
  1721. }
  1722. select {
  1723. case signal := <-service.ChannelKillWindowProducer:
  1724. if signal == 1 {
  1725. commonConfig.RosNode.Close()
  1726. service.AddKillTimes("3")
  1727. return
  1728. }
  1729. }
  1730. }
  1731. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  1732. saveTimeWindowMutex.Lock()
  1733. defer saveTimeWindowMutex.Unlock()
  1734. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1735. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) { // 如果是不在旧故障窗口内,添加一个新窗口
  1736. exceptBegin := util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime)
  1737. finalTimeWindowBegin := ""
  1738. if util.TimeCustom1LessEqualThanTimeCustom2(exceptBegin, latestTimeWindowEnd) { // 窗口最早时间不能早于上一个窗口结束时间
  1739. finalTimeWindowBegin = latestTimeWindowEnd
  1740. } else {
  1741. finalTimeWindowBegin = exceptBegin
  1742. }
  1743. latestTimeWindowEnd = util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1744. newTimeWindow := commonEntity.TimeWindow{
  1745. FaultTime: faultHappenTime,
  1746. TimeWindowBegin: finalTimeWindowBegin,
  1747. TimeWindowEnd: latestTimeWindowEnd,
  1748. Length: util.CalculateDifferenceOfTimeCustom(finalTimeWindowBegin, latestTimeWindowEnd),
  1749. Labels: []string{faultLabel},
  1750. MasterTopics: masterTopics,
  1751. SlaveTopics: slaveTopics,
  1752. }
  1753. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1754. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1755. } else { // 如果在旧故障窗口内
  1756. commonEntity.TimeWindowProducerQueueMutex.RLock()
  1757. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  1758. // 更新故障窗口end时间
  1759. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime) // 窗口期望关闭时间是触发时间加上后置时间
  1760. expectLength := util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1761. if expectLength < commonConfig.PlatformConfig.TaskMaxTime {
  1762. latestTimeWindowEnd = expectEnd
  1763. lastTimeWindow.TimeWindowEnd = latestTimeWindowEnd
  1764. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, lastTimeWindow.TimeWindowEnd)
  1765. }
  1766. // 更新label
  1767. labels := lastTimeWindow.Labels
  1768. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1769. // 更新 topic
  1770. sourceMasterTopics := lastTimeWindow.MasterTopics
  1771. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1772. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1773. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1774. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1775. }
  1776. }
  1777. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1778. // 获取所有需要采集的topic
  1779. var faultCodeTopics []string
  1780. for _, code := range commonConfig.CloudConfig.Triggers {
  1781. if code.Label == faultLabel {
  1782. faultCodeTopics = code.Topics
  1783. }
  1784. }
  1785. // 根据不同节点采集的topic进行分配采集
  1786. for _, acceptTopic := range faultCodeTopics {
  1787. for _, host := range commonConfig.CloudConfig.Hosts {
  1788. for _, topic := range host.Topics {
  1789. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1790. masterTopics = append(masterTopics, acceptTopic)
  1791. }
  1792. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1793. slaveTopics = append(slaveTopics, acceptTopic)
  1794. }
  1795. }
  1796. }
  1797. }
  1798. return masterTopics, slaveTopics
  1799. }