produce_window.go 65 KB


  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "cicv-data-closedloop/pjisuv_param"
  11. "github.com/bluenviron/goroslib/v2"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  16. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  17. "math"
  18. "sync"
  19. "time"
  20. )
  21. // 所有共享变量
  22. var (
  23. latestTimeWindowEnd = util.GetTimeString(time.Now())
  24. pjisuvParam = pjisuv_param.PjisuvParam{
  25. ObjDicOfTpperception: make(map[uint32][]float32),
  26. ObjTypeDicOfTpperception: make(map[uint32]uint8),
  27. ObjSpeedDicOfTpperception: make(map[uint32]float64),
  28. } // /cicv_location
  29. mutexOfCicvLocation sync.RWMutex
  30. // /tpperception
  31. mutexOfTpperception sync.RWMutex
  32. // /pj_control_pub
  33. mutexOfPjControlPub sync.RWMutex
  34. // /data_read
  35. mutexOfDataRead sync.RWMutex
  36. // /pj_vehicle_fdb_pub
  37. mutexOfPjVehicleFdbPub sync.RWMutex
  38. // /pj_vehicle_fdb_pub
  39. mutexOfCicvAmrTrajectory sync.RWMutex
  40. )
  41. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  42. func PrepareTimeWindowProducerQueue() {
  43. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  44. var err error
  45. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  46. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  47. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  48. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  49. for i, topic := range commonConfig.SubscribeTopics {
  50. // 增加了可扩展性
  51. if topic == masterConfig.TopicOfCicvExtend {
  52. go func() {
  53. for {
  54. time.Sleep(time.Duration(3500) * time.Millisecond)
  55. for _, f := range masterConfig.RuleOfCicvExtend {
  56. label := f(pjisuvParam)
  57. if label != "" {
  58. saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
  59. subscribersTimes[i] = time.Now()
  60. break
  61. }
  62. }
  63. }
  64. }()
  65. }
  66. // 其他常规监听器
  67. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  68. // 1
  69. if topic == masterConfig.TopicOfAmrPose && (len(masterConfig.RuleOfAmrPose1) > 0 || len(masterConfig.RuleOfAmrPose2) > 0) {
  70. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  71. Node: commonConfig.RosNode,
  72. Topic: topic,
  73. Callback: func(data *visualization_msgs.MarkerArray) {
  74. subscribersTimeMutexes[i].Lock()
  75. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  76. subscribersMutexes[i].Lock()
  77. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  78. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  79. faultLabel := ""
  80. if len(masterConfig.RuleOfAmrPose1) > 0 {
  81. for _, f := range masterConfig.RuleOfAmrPose1 {
  82. faultLabel = f(data)
  83. if faultLabel != "" {
  84. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  85. subscribersTimes[i] = time.Now()
  86. goto TriggerSuccess
  87. }
  88. }
  89. }
  90. if len(masterConfig.RuleOfAmrPose2) > 0 {
  91. for _, f := range masterConfig.RuleOfAmrPose2 {
  92. faultLabel = f(data, &pjisuvParam)
  93. if faultLabel != "" {
  94. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  95. subscribersTimes[i] = time.Now()
  96. goto TriggerSuccess
  97. }
  98. }
  99. }
  100. TriggerSuccess:
  101. subscribersMutexes[i].Unlock()
  102. }
  103. subscribersTimeMutexes[i].Unlock()
  104. },
  105. })
  106. }
  107. // 2
  108. if topic == masterConfig.TopicOfBoundingBoxesFast && (len(masterConfig.RuleOfBoundingBoxesFast1) > 0 || len(masterConfig.RuleOfBoundingBoxesFast2) > 0) {
  109. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  110. Node: commonConfig.RosNode,
  111. Topic: topic,
  112. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  113. subscribersTimeMutexes[i].Lock()
  114. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  115. subscribersMutexes[i].Lock()
  116. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  117. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  118. faultLabel := ""
  119. if len(masterConfig.RuleOfBoundingBoxesFast1) > 0 {
  120. for _, f := range masterConfig.RuleOfBoundingBoxesFast1 {
  121. faultLabel = f(data)
  122. if faultLabel != "" {
  123. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  124. subscribersTimes[i] = time.Now()
  125. goto TriggerSuccess
  126. }
  127. }
  128. }
  129. if len(masterConfig.RuleOfBoundingBoxesFast2) > 0 {
  130. for _, f := range masterConfig.RuleOfBoundingBoxesFast2 {
  131. faultLabel = f(data, &pjisuvParam)
  132. if faultLabel != "" {
  133. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  134. subscribersTimes[i] = time.Now()
  135. goto TriggerSuccess
  136. }
  137. }
  138. }
  139. TriggerSuccess:
  140. subscribersMutexes[i].Unlock()
  141. }
  142. subscribersTimeMutexes[i].Unlock()
  143. },
  144. })
  145. }
  146. // 3
  147. if topic == masterConfig.TopicOfCameraFault && (len(masterConfig.RuleOfCameraFault1) > 0 || len(masterConfig.RuleOfCameraFault2) > 0) {
  148. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  149. Node: commonConfig.RosNode,
  150. Topic: topic,
  151. Callback: func(data *pjisuv_msgs.FaultVec) {
  152. subscribersTimeMutexes[i].Lock()
  153. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  154. subscribersMutexes[i].Lock()
  155. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  156. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  157. faultLabel := ""
  158. if len(masterConfig.RuleOfCameraFault1) > 0 {
  159. for _, f := range masterConfig.RuleOfCameraFault1 {
  160. faultLabel = f(data)
  161. if faultLabel != "" {
  162. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  163. subscribersTimes[i] = time.Now()
  164. goto TriggerSuccess
  165. }
  166. }
  167. }
  168. if len(masterConfig.RuleOfCameraFault2) > 0 {
  169. for _, f := range masterConfig.RuleOfCameraFault2 {
  170. faultLabel = f(data, &pjisuvParam)
  171. if faultLabel != "" {
  172. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  173. subscribersTimes[i] = time.Now()
  174. goto TriggerSuccess
  175. }
  176. }
  177. }
  178. TriggerSuccess:
  179. subscribersMutexes[i].Unlock()
  180. }
  181. subscribersTimeMutexes[i].Unlock()
  182. },
  183. })
  184. }
  185. // 4
  186. if topic == masterConfig.TopicOfCanData && (len(masterConfig.RuleOfCanData1) > 0 || len(masterConfig.RuleOfCanData2) > 0) {
  187. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  188. Node: commonConfig.RosNode,
  189. Topic: topic,
  190. Callback: func(data *pjisuv_msgs.Frame) {
  191. subscribersTimeMutexes[i].Lock()
  192. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  193. subscribersMutexes[i].Lock()
  194. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  195. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  196. faultLabel := ""
  197. if len(masterConfig.RuleOfCanData1) > 0 {
  198. for _, f := range masterConfig.RuleOfCanData1 {
  199. faultLabel = f(data)
  200. if faultLabel != "" {
  201. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  202. subscribersTimes[i] = time.Now()
  203. goto TriggerSuccess
  204. }
  205. }
  206. }
  207. if len(masterConfig.RuleOfCanData2) > 0 {
  208. for _, f := range masterConfig.RuleOfCanData2 {
  209. faultLabel = f(data, &pjisuvParam)
  210. if faultLabel != "" {
  211. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  212. subscribersTimes[i] = time.Now()
  213. goto TriggerSuccess
  214. }
  215. }
  216. }
  217. TriggerSuccess:
  218. subscribersMutexes[i].Unlock()
  219. }
  220. subscribersTimeMutexes[i].Unlock()
  221. },
  222. })
  223. }
  224. // 5
  225. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud && (len(masterConfig.RuleOfCh128x1LslidarPointCloud1) > 0 || len(masterConfig.RuleOfCh128x1LslidarPointCloud2) > 0) {
  226. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  227. Node: commonConfig.RosNode,
  228. Topic: topic,
  229. Callback: func(data *sensor_msgs.PointCloud2) {
  230. subscribersTimeMutexes[i].Lock()
  231. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  232. subscribersMutexes[i].Lock()
  233. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  234. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  235. faultLabel := ""
  236. if len(masterConfig.RuleOfCh128x1LslidarPointCloud1) > 0 {
  237. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud1 {
  238. faultLabel = f(data)
  239. if faultLabel != "" {
  240. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  241. subscribersTimes[i] = time.Now()
  242. goto TriggerSuccess
  243. }
  244. }
  245. }
  246. if len(masterConfig.RuleOfCh128x1LslidarPointCloud2) > 0 {
  247. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud2 {
  248. faultLabel = f(data, &pjisuvParam)
  249. if faultLabel != "" {
  250. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  251. subscribersTimes[i] = time.Now()
  252. goto TriggerSuccess
  253. }
  254. }
  255. }
  256. TriggerSuccess:
  257. subscribersMutexes[i].Unlock()
  258. }
  259. subscribersTimeMutexes[i].Unlock()
  260. },
  261. })
  262. }
  263. // 6
  264. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud && (len(masterConfig.RuleOfCh64wLLslidarPointCloud1) > 0 || len(masterConfig.RuleOfCh64wLLslidarPointCloud2) > 1) {
  265. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  266. Node: commonConfig.RosNode,
  267. Topic: topic,
  268. Callback: func(data *sensor_msgs.PointCloud2) {
  269. subscribersTimeMutexes[i].Lock()
  270. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  271. subscribersMutexes[i].Lock()
  272. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  273. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  274. faultLabel := ""
  275. if len(masterConfig.RuleOfCh64wLLslidarPointCloud1) > 0 {
  276. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud1 {
  277. faultLabel = f(data)
  278. if faultLabel != "" {
  279. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  280. subscribersTimes[i] = time.Now()
  281. goto TriggerSuccess
  282. }
  283. }
  284. }
  285. if len(masterConfig.RuleOfCh64wLLslidarPointCloud2) > 0 {
  286. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud2 {
  287. faultLabel = f(data, &pjisuvParam)
  288. if faultLabel != "" {
  289. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  290. subscribersTimes[i] = time.Now()
  291. goto TriggerSuccess
  292. }
  293. }
  294. }
  295. TriggerSuccess:
  296. subscribersMutexes[i].Unlock()
  297. }
  298. subscribersTimeMutexes[i].Unlock()
  299. },
  300. })
  301. }
  302. // 7
  303. if topic == masterConfig.TopicOfCh64wLScan && (len(masterConfig.RuleOfCh64wLScan1) > 0 || len(masterConfig.RuleOfCh64wLScan2) > 0) {
  304. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  305. Node: commonConfig.RosNode,
  306. Topic: topic,
  307. Callback: func(data *sensor_msgs.LaserScan) {
  308. subscribersTimeMutexes[i].Lock()
  309. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  310. subscribersMutexes[i].Lock()
  311. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  312. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  313. faultLabel := ""
  314. if len(masterConfig.RuleOfCh64wLScan1) > 0 {
  315. for _, f := range masterConfig.RuleOfCh64wLScan1 {
  316. faultLabel = f(data)
  317. if faultLabel != "" {
  318. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  319. subscribersTimes[i] = time.Now()
  320. goto TriggerSuccess
  321. }
  322. }
  323. }
  324. if len(masterConfig.RuleOfCh64wLScan2) > 0 {
  325. for _, f := range masterConfig.RuleOfCh64wLScan2 {
  326. faultLabel = f(data, &pjisuvParam)
  327. if faultLabel != "" {
  328. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  329. subscribersTimes[i] = time.Now()
  330. goto TriggerSuccess
  331. }
  332. }
  333. }
  334. TriggerSuccess:
  335. subscribersMutexes[i].Unlock()
  336. }
  337. subscribersTimeMutexes[i].Unlock()
  338. },
  339. })
  340. }
  341. // 8
  342. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud && (len(masterConfig.RuleOfCh64wRLslidarPointCloud1) > 0 || len(masterConfig.RuleOfCh64wRLslidarPointCloud2) > 0) {
  343. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  344. Node: commonConfig.RosNode,
  345. Topic: topic,
  346. Callback: func(data *sensor_msgs.PointCloud2) {
  347. subscribersTimeMutexes[i].Lock()
  348. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  349. subscribersMutexes[i].Lock()
  350. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  351. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  352. faultLabel := ""
  353. if len(masterConfig.RuleOfCh64wRLslidarPointCloud1) > 0 {
  354. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud1 {
  355. faultLabel = f(data)
  356. if faultLabel != "" {
  357. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  358. subscribersTimes[i] = time.Now()
  359. goto TriggerSuccess
  360. }
  361. }
  362. }
  363. if len(masterConfig.RuleOfCh64wRLslidarPointCloud2) > 0 {
  364. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud2 {
  365. faultLabel = f(data, &pjisuvParam)
  366. if faultLabel != "" {
  367. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  368. subscribersTimes[i] = time.Now()
  369. goto TriggerSuccess
  370. }
  371. }
  372. }
  373. TriggerSuccess:
  374. subscribersMutexes[i].Unlock()
  375. }
  376. subscribersTimeMutexes[i].Unlock()
  377. },
  378. })
  379. }
  380. // 9
  381. if topic == masterConfig.TopicOfCh64wRScan && (len(masterConfig.RuleOfCh64wRScan1) > 0 || len(masterConfig.RuleOfCh64wRScan2) > 0) {
  382. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  383. Node: commonConfig.RosNode,
  384. Topic: topic,
  385. Callback: func(data *sensor_msgs.LaserScan) {
  386. subscribersTimeMutexes[i].Lock()
  387. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  388. subscribersMutexes[i].Lock()
  389. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  390. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  391. faultLabel := ""
  392. if len(masterConfig.RuleOfCh64wRScan1) > 0 {
  393. for _, f := range masterConfig.RuleOfCh64wRScan1 {
  394. faultLabel = f(data)
  395. if faultLabel != "" {
  396. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  397. subscribersTimes[i] = time.Now()
  398. goto TriggerSuccess
  399. }
  400. }
  401. }
  402. if len(masterConfig.RuleOfCh64wRScan2) > 0 {
  403. for _, f := range masterConfig.RuleOfCh64wRScan2 {
  404. faultLabel = f(data, &pjisuvParam)
  405. if faultLabel != "" {
  406. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  407. subscribersTimes[i] = time.Now()
  408. goto TriggerSuccess
  409. }
  410. }
  411. }
  412. TriggerSuccess:
  413. subscribersMutexes[i].Unlock()
  414. }
  415. subscribersTimeMutexes[i].Unlock()
  416. },
  417. })
  418. }
  419. // 10
  420. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects && (len(masterConfig.RuleOfCicvLidarclusterMovingObjects1) > 0 || len(masterConfig.RuleOfCicvLidarclusterMovingObjects2) > 0) {
  421. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  422. Node: commonConfig.RosNode,
  423. Topic: topic,
  424. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  425. subscribersTimeMutexes[i].Lock()
  426. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  427. subscribersMutexes[i].Lock()
  428. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  429. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  430. faultLabel := ""
  431. if len(masterConfig.RuleOfCicvLidarclusterMovingObjects1) > 0 {
  432. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects1 {
  433. faultLabel = f(data)
  434. if faultLabel != "" {
  435. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  436. subscribersTimes[i] = time.Now()
  437. goto TriggerSuccess
  438. }
  439. }
  440. }
  441. if len(masterConfig.RuleOfCicvLidarclusterMovingObjects2) > 0 {
  442. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects2 {
  443. faultLabel = f(data, &pjisuvParam)
  444. if faultLabel != "" {
  445. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  446. subscribersTimes[i] = time.Now()
  447. goto TriggerSuccess
  448. }
  449. }
  450. }
  451. TriggerSuccess:
  452. subscribersMutexes[i].Unlock()
  453. }
  454. subscribersTimeMutexes[i].Unlock()
  455. },
  456. })
  457. }
  458. // 11
  459. if topic == masterConfig.TopicOfCicvAmrTrajectory && (len(masterConfig.RuleOfCicvAmrTrajectory1) > 0 || len(masterConfig.RuleOfCicvAmrTrajectory2) > 0) {
  460. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  461. Node: commonConfig.RosNode,
  462. Topic: topic,
  463. Callback: func(data *pjisuv_msgs.Trajectory) {
  464. subscribersTimeMutexes[i].Lock()
  465. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  466. subscribersMutexes[i].Lock()
  467. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  468. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  469. faultLabel := ""
  470. if len(masterConfig.RuleOfCicvAmrTrajectory1) > 0 {
  471. for _, f := range masterConfig.RuleOfCicvAmrTrajectory1 {
  472. faultLabel = f(data)
  473. if faultLabel != "" {
  474. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  475. subscribersTimes[i] = time.Now()
  476. goto TriggerSuccess
  477. }
  478. }
  479. }
  480. if len(masterConfig.RuleOfCicvAmrTrajectory2) > 0 {
  481. for _, f := range masterConfig.RuleOfCicvAmrTrajectory2 {
  482. faultLabel = f(data, &pjisuvParam)
  483. if faultLabel != "" {
  484. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  485. subscribersTimes[i] = time.Now()
  486. goto TriggerSuccess
  487. }
  488. }
  489. }
  490. TriggerSuccess:
  491. subscribersMutexes[i].Unlock()
  492. }
  493. subscribersTimeMutexes[i].Unlock()
  494. // 触发后更新共享变量
  495. mutexOfCicvAmrTrajectory.RLock()
  496. {
  497. var currentCurvateres []float64
  498. for _, point := range data.Trajectoryinfo.Trajectorypoints {
  499. currentCurvateres = append(currentCurvateres, math.Abs(float64(point.Curvature)))
  500. }
  501. pjisuvParam.LastCurvaturesOfCicvAmrTrajectory = currentCurvateres
  502. }
  503. mutexOfCicvAmrTrajectory.RUnlock()
  504. },
  505. })
  506. }
  507. // 12
  508. if topic == masterConfig.TopicOfCicvLocation && (len(masterConfig.RuleOfCicvLocation1) > 0 || len(masterConfig.RuleOfCicvLocation2) > 0) {
  509. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  510. Node: commonConfig.RosNode,
  511. Topic: topic,
  512. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  513. // 更新共享变量
  514. mutexOfCicvLocation.RLock()
  515. {
  516. pjisuvParam.VelocityXOfCicvLocation = data.VelocityX
  517. pjisuvParam.VelocityYOfCicvLocation = data.VelocityY
  518. pjisuvParam.VelocityZOfCicvLocation = data.VelocityZ
  519. pjisuvParam.YawOfCicvLocation = data.Yaw
  520. pjisuvParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
  521. pjisuvParam.PositionXOfCicvLocation = data.PositionX
  522. pjisuvParam.PositionYOfCicvLocation = data.PositionY
  523. }
  524. mutexOfCicvLocation.RUnlock()
  525. subscribersTimeMutexes[i].Lock()
  526. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  527. subscribersMutexes[i].Lock()
  528. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  529. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  530. faultLabel := ""
  531. if len(masterConfig.RuleOfCicvLocation1) > 0 {
  532. for _, f := range masterConfig.RuleOfCicvLocation1 {
  533. faultLabel = f(data)
  534. if faultLabel != "" {
  535. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  536. subscribersTimes[i] = time.Now()
  537. goto TriggerSuccess
  538. }
  539. }
  540. }
  541. if len(masterConfig.RuleOfCicvLocation2) > 0 {
  542. for _, f := range masterConfig.RuleOfCicvLocation2 {
  543. faultLabel = f(data, &pjisuvParam)
  544. if faultLabel != "" {
  545. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  546. subscribersTimes[i] = time.Now()
  547. goto TriggerSuccess
  548. }
  549. }
  550. }
  551. TriggerSuccess:
  552. subscribersMutexes[i].Unlock()
  553. }
  554. subscribersTimeMutexes[i].Unlock()
  555. },
  556. })
  557. }
  558. // 13
  559. if topic == masterConfig.TopicOfCloudClusters && (len(masterConfig.RuleOfCloudClusters1) > 0 || len(masterConfig.RuleOfCloudClusters2) > 0) {
  560. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  561. Node: commonConfig.RosNode,
  562. Topic: topic,
  563. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  564. subscribersTimeMutexes[i].Lock()
  565. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  566. subscribersMutexes[i].Lock()
  567. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  568. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  569. faultLabel := ""
  570. if len(masterConfig.RuleOfCloudClusters1) > 0 {
  571. for _, f := range masterConfig.RuleOfCloudClusters1 {
  572. faultLabel = f(data)
  573. if faultLabel != "" {
  574. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  575. subscribersTimes[i] = time.Now()
  576. goto TriggerSuccess
  577. }
  578. }
  579. }
  580. if len(masterConfig.RuleOfCloudClusters2) > 0 {
  581. for _, f := range masterConfig.RuleOfCloudClusters2 {
  582. faultLabel = f(data, &pjisuvParam)
  583. if faultLabel != "" {
  584. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  585. subscribersTimes[i] = time.Now()
  586. goto TriggerSuccess
  587. }
  588. }
  589. }
  590. TriggerSuccess:
  591. subscribersMutexes[i].Unlock()
  592. }
  593. subscribersTimeMutexes[i].Unlock()
  594. },
  595. })
  596. }
  597. // 14
  598. if topic == masterConfig.TopicOfHeartbeatInfo && (len(masterConfig.RuleOfHeartbeatInfo1) > 0 || len(masterConfig.RuleOfHeartbeatInfo2) > 0) {
  599. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  600. Node: commonConfig.RosNode,
  601. Topic: topic,
  602. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  603. subscribersTimeMutexes[i].Lock()
  604. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  605. subscribersMutexes[i].Lock()
  606. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  607. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  608. faultLabel := ""
  609. if len(masterConfig.RuleOfHeartbeatInfo1) > 0 {
  610. for _, f := range masterConfig.RuleOfHeartbeatInfo1 {
  611. faultLabel = f(data)
  612. if faultLabel != "" {
  613. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  614. subscribersTimes[i] = time.Now()
  615. goto TriggerSuccess
  616. }
  617. }
  618. }
  619. if len(masterConfig.RuleOfHeartbeatInfo2) > 0 {
  620. for _, f := range masterConfig.RuleOfHeartbeatInfo2 {
  621. faultLabel = f(data, &pjisuvParam)
  622. if faultLabel != "" {
  623. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  624. subscribersTimes[i] = time.Now()
  625. goto TriggerSuccess
  626. }
  627. }
  628. }
  629. TriggerSuccess:
  630. subscribersMutexes[i].Unlock()
  631. }
  632. subscribersTimeMutexes[i].Unlock()
  633. },
  634. })
  635. }
  636. // 15
  637. if topic == masterConfig.TopicOfLidarPretreatmentCost && (len(masterConfig.RuleOfLidarPretreatmentCost1) > 0 || len(masterConfig.RuleOfLidarPretreatmentCost2) > 0) {
  638. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  639. Node: commonConfig.RosNode,
  640. Topic: topic,
  641. Callback: func(data *geometry_msgs.Vector3Stamped) {
  642. subscribersTimeMutexes[i].Lock()
  643. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  644. subscribersMutexes[i].Lock()
  645. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  646. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  647. faultLabel := ""
  648. if len(masterConfig.RuleOfLidarPretreatmentCost1) > 0 {
  649. for _, f := range masterConfig.RuleOfLidarPretreatmentCost1 {
  650. faultLabel = f(data)
  651. if faultLabel != "" {
  652. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  653. subscribersTimes[i] = time.Now()
  654. goto TriggerSuccess
  655. }
  656. }
  657. }
  658. if len(masterConfig.RuleOfLidarPretreatmentCost2) > 0 {
  659. for _, f := range masterConfig.RuleOfLidarPretreatmentCost2 {
  660. faultLabel = f(data, &pjisuvParam)
  661. if faultLabel != "" {
  662. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  663. subscribersTimes[i] = time.Now()
  664. goto TriggerSuccess
  665. }
  666. }
  667. }
  668. TriggerSuccess:
  669. subscribersMutexes[i].Unlock()
  670. }
  671. subscribersTimeMutexes[i].Unlock()
  672. },
  673. })
  674. }
  675. // 16
  676. if topic == masterConfig.TopicOfLidarPretreatmentOdometry && (len(masterConfig.RuleOfLidarPretreatmentOdometry1) > 0 || len(masterConfig.RuleOfLidarPretreatmentOdometry2) > 0) {
  677. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  678. Node: commonConfig.RosNode,
  679. Topic: topic,
  680. Callback: func(data *nav_msgs.Odometry) {
  681. subscribersTimeMutexes[i].Lock()
  682. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  683. subscribersMutexes[i].Lock()
  684. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  685. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  686. faultLabel := ""
  687. if len(masterConfig.RuleOfLidarPretreatmentOdometry1) > 0 {
  688. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry1 {
  689. faultLabel = f(data)
  690. if faultLabel != "" {
  691. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  692. subscribersTimes[i] = time.Now()
  693. goto TriggerSuccess
  694. }
  695. }
  696. }
  697. if len(masterConfig.RuleOfLidarPretreatmentOdometry2) > 0 {
  698. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry2 {
  699. faultLabel = f(data, &pjisuvParam)
  700. if faultLabel != "" {
  701. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  702. subscribersTimes[i] = time.Now()
  703. goto TriggerSuccess
  704. }
  705. }
  706. }
  707. TriggerSuccess:
  708. subscribersMutexes[i].Unlock()
  709. }
  710. subscribersTimeMutexes[i].Unlock()
  711. },
  712. })
  713. }
  714. // 17
  715. if topic == masterConfig.TopicOfLidarRoi && (len(masterConfig.RuleOfLidarRoi1) > 0 || len(masterConfig.RuleOfLidarRoi2) > 0) {
  716. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  717. Node: commonConfig.RosNode,
  718. Topic: topic,
  719. Callback: func(data *geometry_msgs.PolygonStamped) {
  720. subscribersTimeMutexes[i].Lock()
  721. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  722. subscribersMutexes[i].Lock()
  723. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  724. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  725. faultLabel := ""
  726. if len(masterConfig.RuleOfLidarRoi1) > 0 {
  727. for _, f := range masterConfig.RuleOfLidarRoi1 {
  728. faultLabel = f(data)
  729. if faultLabel != "" {
  730. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  731. subscribersTimes[i] = time.Now()
  732. goto TriggerSuccess
  733. }
  734. }
  735. }
  736. if len(masterConfig.RuleOfLidarRoi2) > 0 {
  737. for _, f := range masterConfig.RuleOfLidarRoi2 {
  738. faultLabel = f(data, &pjisuvParam)
  739. if faultLabel != "" {
  740. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  741. subscribersTimes[i] = time.Now()
  742. goto TriggerSuccess
  743. }
  744. }
  745. }
  746. TriggerSuccess:
  747. subscribersMutexes[i].Unlock()
  748. }
  749. subscribersTimeMutexes[i].Unlock()
  750. },
  751. })
  752. }
  753. // 18
  754. if topic == masterConfig.TopicOfLine1 && (len(masterConfig.RuleOfLine11) > 0 || len(masterConfig.RuleOfLine12) > 0) {
  755. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  756. Node: commonConfig.RosNode,
  757. Topic: topic,
  758. Callback: func(data *nav_msgs.Path) {
  759. subscribersTimeMutexes[i].Lock()
  760. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  761. subscribersMutexes[i].Lock()
  762. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  763. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  764. faultLabel := ""
  765. if len(masterConfig.RuleOfLine11) > 0 {
  766. for _, f := range masterConfig.RuleOfLine11 {
  767. faultLabel = f(data)
  768. if faultLabel != "" {
  769. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  770. subscribersTimes[i] = time.Now()
  771. goto TriggerSuccess
  772. }
  773. }
  774. }
  775. if len(masterConfig.RuleOfLine12) > 0 {
  776. for _, f := range masterConfig.RuleOfLine12 {
  777. faultLabel = f(data, &pjisuvParam)
  778. if faultLabel != "" {
  779. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  780. subscribersTimes[i] = time.Now()
  781. goto TriggerSuccess
  782. }
  783. }
  784. }
  785. TriggerSuccess:
  786. subscribersMutexes[i].Unlock()
  787. }
  788. subscribersTimeMutexes[i].Unlock()
  789. },
  790. })
  791. }
  792. // 19
  793. if topic == masterConfig.TopicOfLine2 && (len(masterConfig.RuleOfLine21) > 0 || len(masterConfig.RuleOfLine22) > 0) {
  794. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  795. Node: commonConfig.RosNode,
  796. Topic: topic,
  797. Callback: func(data *nav_msgs.Path) {
  798. subscribersTimeMutexes[i].Lock()
  799. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  800. subscribersMutexes[i].Lock()
  801. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  802. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  803. faultLabel := ""
  804. if len(masterConfig.RuleOfLine21) > 0 {
  805. for _, f := range masterConfig.RuleOfLine21 {
  806. faultLabel = f(data)
  807. if faultLabel != "" {
  808. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  809. subscribersTimes[i] = time.Now()
  810. goto TriggerSuccess
  811. }
  812. }
  813. }
  814. if len(masterConfig.RuleOfLine22) > 0 {
  815. for _, f := range masterConfig.RuleOfLine22 {
  816. faultLabel = f(data, &pjisuvParam)
  817. if faultLabel != "" {
  818. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  819. subscribersTimes[i] = time.Now()
  820. goto TriggerSuccess
  821. }
  822. }
  823. }
  824. TriggerSuccess:
  825. subscribersMutexes[i].Unlock()
  826. }
  827. subscribersTimeMutexes[i].Unlock()
  828. },
  829. })
  830. }
  831. // 20
  832. if topic == masterConfig.TopicOfMapPolygon && (len(masterConfig.RuleOfMapPolygon1) > 0 || len(masterConfig.RuleOfMapPolygon2) > 0) {
  833. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  834. Node: commonConfig.RosNode,
  835. Topic: topic,
  836. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  837. subscribersTimeMutexes[i].Lock()
  838. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  839. subscribersMutexes[i].Lock()
  840. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  841. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  842. faultLabel := ""
  843. if len(masterConfig.RuleOfMapPolygon1) > 0 {
  844. for _, f := range masterConfig.RuleOfMapPolygon1 {
  845. faultLabel = f(data)
  846. if faultLabel != "" {
  847. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  848. subscribersTimes[i] = time.Now()
  849. goto TriggerSuccess
  850. }
  851. }
  852. }
  853. if len(masterConfig.RuleOfMapPolygon2) > 0 {
  854. for _, f := range masterConfig.RuleOfMapPolygon2 {
  855. faultLabel = f(data, &pjisuvParam)
  856. if faultLabel != "" {
  857. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  858. subscribersTimes[i] = time.Now()
  859. goto TriggerSuccess
  860. }
  861. }
  862. }
  863. TriggerSuccess:
  864. subscribersMutexes[i].Unlock()
  865. }
  866. subscribersTimeMutexes[i].Unlock()
  867. },
  868. })
  869. }
  870. // 21
  871. if topic == masterConfig.TopicOfObstacleDisplay && (len(masterConfig.RuleOfObstacleDisplay1) > 0 || len(masterConfig.RuleOfObstacleDisplay2) > 0) {
  872. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  873. Node: commonConfig.RosNode,
  874. Topic: topic,
  875. Callback: func(data *visualization_msgs.MarkerArray) {
  876. subscribersTimeMutexes[i].Lock()
  877. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  878. subscribersMutexes[i].Lock()
  879. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  880. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  881. faultLabel := ""
  882. if len(masterConfig.RuleOfObstacleDisplay1) > 0 {
  883. for _, f := range masterConfig.RuleOfObstacleDisplay1 {
  884. faultLabel = f(data)
  885. if faultLabel != "" {
  886. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  887. subscribersTimes[i] = time.Now()
  888. goto TriggerSuccess
  889. }
  890. }
  891. }
  892. if len(masterConfig.RuleOfObstacleDisplay2) > 0 {
  893. for _, f := range masterConfig.RuleOfObstacleDisplay2 {
  894. faultLabel = f(data, &pjisuvParam)
  895. if faultLabel != "" {
  896. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  897. subscribersTimes[i] = time.Now()
  898. goto TriggerSuccess
  899. }
  900. }
  901. }
  902. TriggerSuccess:
  903. subscribersMutexes[i].Unlock()
  904. }
  905. subscribersTimeMutexes[i].Unlock()
  906. },
  907. })
  908. }
  909. // 22
  910. if topic == masterConfig.TopicOfPjControlPub && (len(masterConfig.RuleOfPjControlPub1) > 0 || len(masterConfig.RuleOfPjControlPub2) > 0) {
  911. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  912. Node: commonConfig.RosNode,
  913. Topic: topic,
  914. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  915. // 更新共享变量
  916. mutexOfPjControlPub.RLock()
  917. {
  918. pjisuvParam.NumCountPjiControlCommandOfPjControlPub++
  919. if pjisuvParam.NumCountPjiControlCommandOfPjControlPub == 10 {
  920. pjisuvParam.EgoSteeringCmdOfPjControlPub = append(pjisuvParam.EgoSteeringCmdOfPjControlPub, data.ICPVCmdStrAngle)
  921. pjisuvParam.EgoThrottleCmdOfPjControlPub = append(pjisuvParam.EgoThrottleCmdOfPjControlPub, data.ICPVCmdAccPelPosAct)
  922. pjisuvParam.NumCountPjiControlCommandOfPjControlPub = 0
  923. }
  924. }
  925. mutexOfPjControlPub.RUnlock()
  926. subscribersTimeMutexes[i].Lock()
  927. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  928. subscribersMutexes[i].Lock()
  929. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  930. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  931. faultLabel := ""
  932. if len(masterConfig.RuleOfPjControlPub1) > 0 {
  933. for _, f := range masterConfig.RuleOfPjControlPub1 {
  934. faultLabel = f(data)
  935. if faultLabel != "" {
  936. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  937. subscribersTimes[i] = time.Now()
  938. goto TriggerSuccess
  939. }
  940. }
  941. }
  942. if len(masterConfig.RuleOfPjControlPub2) > 0 {
  943. for _, f := range masterConfig.RuleOfPjControlPub2 {
  944. faultLabel = f(data, &pjisuvParam)
  945. if faultLabel != "" {
  946. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  947. subscribersTimes[i] = time.Now()
  948. goto TriggerSuccess
  949. }
  950. }
  951. }
  952. TriggerSuccess:
  953. subscribersMutexes[i].Unlock()
  954. }
  955. subscribersTimeMutexes[i].Unlock()
  956. },
  957. })
  958. }
  959. // 23
  960. if topic == masterConfig.TopicOfPointsCluster && (len(masterConfig.RuleOfPointsCluster1) > 0 || len(masterConfig.RuleOfPointsCluster2) > 0) {
  961. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  962. Node: commonConfig.RosNode,
  963. Topic: topic,
  964. Callback: func(data *sensor_msgs.PointCloud2) {
  965. subscribersTimeMutexes[i].Lock()
  966. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  967. subscribersMutexes[i].Lock()
  968. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  969. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  970. faultLabel := ""
  971. if len(masterConfig.RuleOfPointsCluster1) > 0 {
  972. for _, f := range masterConfig.RuleOfPointsCluster1 {
  973. faultLabel = f(data)
  974. if faultLabel != "" {
  975. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  976. subscribersTimes[i] = time.Now()
  977. goto TriggerSuccess
  978. }
  979. }
  980. }
  981. if len(masterConfig.RuleOfPointsCluster2) > 0 {
  982. for _, f := range masterConfig.RuleOfPointsCluster2 {
  983. faultLabel = f(data, &pjisuvParam)
  984. if faultLabel != "" {
  985. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  986. subscribersTimes[i] = time.Now()
  987. goto TriggerSuccess
  988. }
  989. }
  990. }
  991. TriggerSuccess:
  992. subscribersMutexes[i].Unlock()
  993. }
  994. subscribersTimeMutexes[i].Unlock()
  995. },
  996. })
  997. }
  998. // 24
  999. if topic == masterConfig.TopicOfPointsConcat && (len(masterConfig.RuleOfPointsConcat1) > 0 || len(masterConfig.RuleOfPointsConcat2) > 0) {
  1000. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1001. Node: commonConfig.RosNode,
  1002. Topic: topic,
  1003. Callback: func(data *sensor_msgs.PointCloud2) {
  1004. subscribersTimeMutexes[i].Lock()
  1005. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1006. subscribersMutexes[i].Lock()
  1007. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1008. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1009. faultLabel := ""
  1010. if len(masterConfig.RuleOfPointsConcat1) > 0 {
  1011. for _, f := range masterConfig.RuleOfPointsConcat1 {
  1012. faultLabel = f(data)
  1013. if faultLabel != "" {
  1014. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1015. subscribersTimes[i] = time.Now()
  1016. goto TriggerSuccess
  1017. }
  1018. }
  1019. }
  1020. if len(masterConfig.RuleOfPointsConcat2) > 0 {
  1021. for _, f := range masterConfig.RuleOfPointsConcat2 {
  1022. faultLabel = f(data, &pjisuvParam)
  1023. if faultLabel != "" {
  1024. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1025. subscribersTimes[i] = time.Now()
  1026. goto TriggerSuccess
  1027. }
  1028. }
  1029. }
  1030. TriggerSuccess:
  1031. subscribersMutexes[i].Unlock()
  1032. }
  1033. subscribersTimeMutexes[i].Unlock()
  1034. },
  1035. })
  1036. }
  1037. // 25
  1038. if topic == masterConfig.TopicOfReferenceDisplay && (len(masterConfig.RuleOfReferenceDisplay1) > 0 || len(masterConfig.RuleOfReferenceDisplay2) > 0) {
  1039. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1040. Node: commonConfig.RosNode,
  1041. Topic: topic,
  1042. Callback: func(data *nav_msgs.Path) {
  1043. subscribersTimeMutexes[i].Lock()
  1044. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1045. subscribersMutexes[i].Lock()
  1046. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1047. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1048. faultLabel := ""
  1049. if len(masterConfig.RuleOfReferenceDisplay1) > 0 {
  1050. for _, f := range masterConfig.RuleOfReferenceDisplay1 {
  1051. faultLabel = f(data)
  1052. if faultLabel != "" {
  1053. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1054. subscribersTimes[i] = time.Now()
  1055. goto TriggerSuccess
  1056. }
  1057. }
  1058. }
  1059. if len(masterConfig.RuleOfReferenceDisplay2) > 0 {
  1060. for _, f := range masterConfig.RuleOfReferenceDisplay2 {
  1061. faultLabel = f(data, &pjisuvParam)
  1062. if faultLabel != "" {
  1063. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1064. subscribersTimes[i] = time.Now()
  1065. goto TriggerSuccess
  1066. }
  1067. }
  1068. }
  1069. TriggerSuccess:
  1070. subscribersMutexes[i].Unlock()
  1071. }
  1072. subscribersTimeMutexes[i].Unlock()
  1073. },
  1074. })
  1075. }
  1076. // 26
  1077. if topic == masterConfig.TopicOfReferenceTrajectory && (len(masterConfig.RuleOfReferenceTrajectory1) > 0 || len(masterConfig.RuleOfReferenceTrajectory2) > 0) {
  1078. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1079. Node: commonConfig.RosNode,
  1080. Topic: topic,
  1081. Callback: func(data *pjisuv_msgs.Trajectory) {
  1082. subscribersTimeMutexes[i].Lock()
  1083. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1084. subscribersMutexes[i].Lock()
  1085. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1086. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1087. faultLabel := ""
  1088. if len(masterConfig.RuleOfReferenceTrajectory1) > 0 {
  1089. for _, f := range masterConfig.RuleOfReferenceTrajectory1 {
  1090. faultLabel = f(data)
  1091. if faultLabel != "" {
  1092. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1093. subscribersTimes[i] = time.Now()
  1094. goto TriggerSuccess
  1095. }
  1096. }
  1097. }
  1098. if len(masterConfig.RuleOfReferenceTrajectory2) > 0 {
  1099. for _, f := range masterConfig.RuleOfReferenceTrajectory2 {
  1100. faultLabel = f(data, &pjisuvParam)
  1101. if faultLabel != "" {
  1102. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1103. subscribersTimes[i] = time.Now()
  1104. goto TriggerSuccess
  1105. }
  1106. }
  1107. }
  1108. TriggerSuccess:
  1109. subscribersMutexes[i].Unlock()
  1110. }
  1111. subscribersTimeMutexes[i].Unlock()
  1112. },
  1113. })
  1114. }
  1115. // 27
  1116. if topic == masterConfig.TopicOfRoiPoints && (len(masterConfig.RuleOfRoiPoints1) > 0 || len(masterConfig.RuleOfRoiPoints2) > 0) {
  1117. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1118. Node: commonConfig.RosNode,
  1119. Topic: topic,
  1120. Callback: func(data *sensor_msgs.PointCloud2) {
  1121. subscribersTimeMutexes[i].Lock()
  1122. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1123. subscribersMutexes[i].Lock()
  1124. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1125. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1126. faultLabel := ""
  1127. if len(masterConfig.RuleOfRoiPoints1) > 0 {
  1128. for _, f := range masterConfig.RuleOfRoiPoints1 {
  1129. faultLabel = f(data)
  1130. if faultLabel != "" {
  1131. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1132. subscribersTimes[i] = time.Now()
  1133. goto TriggerSuccess
  1134. }
  1135. }
  1136. }
  1137. if len(masterConfig.RuleOfRoiPoints2) > 0 {
  1138. for _, f := range masterConfig.RuleOfRoiPoints2 {
  1139. faultLabel = f(data, &pjisuvParam)
  1140. if faultLabel != "" {
  1141. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1142. subscribersTimes[i] = time.Now()
  1143. goto TriggerSuccess
  1144. }
  1145. }
  1146. }
  1147. TriggerSuccess:
  1148. subscribersMutexes[i].Unlock()
  1149. }
  1150. subscribersTimeMutexes[i].Unlock()
  1151. },
  1152. })
  1153. }
  1154. // 28
  1155. if topic == masterConfig.TopicOfRoiPolygon && (len(masterConfig.RuleOfRoiPolygon1) > 0 || len(masterConfig.RuleOfRoiPolygon2) > 0) {
  1156. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1157. Node: commonConfig.RosNode,
  1158. Topic: topic,
  1159. Callback: func(data *nav_msgs.Path) {
  1160. subscribersTimeMutexes[i].Lock()
  1161. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1162. subscribersMutexes[i].Lock()
  1163. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1164. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1165. faultLabel := ""
  1166. if len(masterConfig.RuleOfRoiPolygon1) > 0 {
  1167. for _, f := range masterConfig.RuleOfRoiPolygon1 {
  1168. faultLabel = f(data)
  1169. if faultLabel != "" {
  1170. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1171. subscribersTimes[i] = time.Now()
  1172. goto TriggerSuccess
  1173. }
  1174. }
  1175. }
  1176. if len(masterConfig.RuleOfRoiPolygon2) > 0 {
  1177. for _, f := range masterConfig.RuleOfRoiPolygon2 {
  1178. faultLabel = f(data, &pjisuvParam)
  1179. if faultLabel != "" {
  1180. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1181. subscribersTimes[i] = time.Now()
  1182. goto TriggerSuccess
  1183. }
  1184. }
  1185. }
  1186. TriggerSuccess:
  1187. subscribersMutexes[i].Unlock()
  1188. }
  1189. subscribersTimeMutexes[i].Unlock()
  1190. },
  1191. })
  1192. }
  1193. // 29
  1194. if topic == masterConfig.TopicOfTf && (len(masterConfig.RuleOfTf1) > 0 || len(masterConfig.RuleOfTf2) > 0) {
  1195. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1196. Node: commonConfig.RosNode,
  1197. Topic: topic,
  1198. Callback: func(data *tf2_msgs.TFMessage) {
  1199. subscribersTimeMutexes[i].Lock()
  1200. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1201. subscribersMutexes[i].Lock()
  1202. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1203. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1204. faultLabel := ""
  1205. if len(masterConfig.RuleOfTf1) > 0 {
  1206. for _, f := range masterConfig.RuleOfTf1 {
  1207. faultLabel = f(data)
  1208. if faultLabel != "" {
  1209. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1210. subscribersTimes[i] = time.Now()
  1211. goto TriggerSuccess
  1212. }
  1213. }
  1214. }
  1215. if len(masterConfig.RuleOfTf2) > 0 {
  1216. for _, f := range masterConfig.RuleOfTf2 {
  1217. faultLabel = f(data, &pjisuvParam)
  1218. if faultLabel != "" {
  1219. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1220. subscribersTimes[i] = time.Now()
  1221. goto TriggerSuccess
  1222. }
  1223. }
  1224. }
  1225. TriggerSuccess:
  1226. subscribersMutexes[i].Unlock()
  1227. }
  1228. subscribersTimeMutexes[i].Unlock()
  1229. },
  1230. })
  1231. }
  1232. // 30
  1233. if topic == masterConfig.TopicOfTpperception && (len(masterConfig.RuleOfTpperception1) > 0 || len(masterConfig.RuleOfTpperception2) > 0) {
  1234. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1235. Node: commonConfig.RosNode,
  1236. Topic: topic,
  1237. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  1238. // 更新共享变量
  1239. mutexOfTpperception.RLock()
  1240. {
  1241. for _, obj := range data.Objs {
  1242. if obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  1243. continue
  1244. }
  1245. // 检查 ObjDicOfTpperception 是否为 nil,如果是,则初始化它
  1246. if pjisuvParam.ObjDicOfTpperception == nil {
  1247. pjisuvParam.ObjDicOfTpperception = make(map[uint32][]float32)
  1248. }
  1249. if _, ok := pjisuvParam.ObjDicOfTpperception[obj.Id]; !ok {
  1250. pjisuvParam.ObjDicOfTpperception[obj.Id] = []float32{}
  1251. }
  1252. pjisuvParam.ObjDicOfTpperception[obj.Id] = append(pjisuvParam.ObjDicOfTpperception[obj.Id], obj.Y)
  1253. }
  1254. }
  1255. mutexOfTpperception.RUnlock()
  1256. subscribersTimeMutexes[i].Lock()
  1257. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1258. subscribersMutexes[i].Lock()
  1259. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1260. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1261. faultLabel := ""
  1262. if len(masterConfig.RuleOfTpperception1) > 0 {
  1263. for _, f := range masterConfig.RuleOfTpperception1 {
  1264. faultLabel = f(data)
  1265. if faultLabel != "" {
  1266. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1267. subscribersTimes[i] = time.Now()
  1268. goto TriggerSuccess
  1269. }
  1270. }
  1271. }
  1272. if len(masterConfig.RuleOfTpperception2) > 0 {
  1273. for _, f := range masterConfig.RuleOfTpperception2 {
  1274. faultLabel = f(data, &pjisuvParam)
  1275. if faultLabel != "" {
  1276. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1277. subscribersTimes[i] = time.Now()
  1278. goto TriggerSuccess
  1279. }
  1280. }
  1281. }
  1282. TriggerSuccess:
  1283. subscribersMutexes[i].Unlock()
  1284. }
  1285. subscribersTimeMutexes[i].Unlock()
  1286. // -------- 触发后更新共享变量
  1287. mutexOfTpperception.RLock()
  1288. {
  1289. for _, obj := range data.Objs {
  1290. pjisuvParam.ObjTypeDicOfTpperception[obj.Id] = obj.Type
  1291. pjisuvParam.ObjSpeedDicOfTpperception[obj.Id] = math.Pow(math.Pow(float64(obj.Vxabs), 2)+math.Pow(float64(obj.Vyabs), 2), 0.5)
  1292. }
  1293. }
  1294. mutexOfTpperception.RUnlock()
  1295. },
  1296. })
  1297. }
  1298. // 31
  1299. if topic == masterConfig.TopicOfTpperceptionVis && (len(masterConfig.RuleOfTpperceptionVis1) > 0 || len(masterConfig.RuleOfTpperceptionVis2) > 0) {
  1300. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1301. Node: commonConfig.RosNode,
  1302. Topic: topic,
  1303. Callback: func(data *visualization_msgs.MarkerArray) {
  1304. subscribersTimeMutexes[i].Lock()
  1305. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1306. subscribersMutexes[i].Lock()
  1307. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1308. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1309. faultLabel := ""
  1310. if len(masterConfig.RuleOfTpperceptionVis1) > 0 {
  1311. for _, f := range masterConfig.RuleOfTpperceptionVis1 {
  1312. faultLabel = f(data)
  1313. if faultLabel != "" {
  1314. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1315. subscribersTimes[i] = time.Now()
  1316. goto TriggerSuccess
  1317. }
  1318. }
  1319. }
  1320. if len(masterConfig.RuleOfTpperceptionVis2) > 0 {
  1321. for _, f := range masterConfig.RuleOfTpperceptionVis2 {
  1322. faultLabel = f(data, &pjisuvParam)
  1323. if faultLabel != "" {
  1324. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1325. subscribersTimes[i] = time.Now()
  1326. goto TriggerSuccess
  1327. }
  1328. }
  1329. }
  1330. TriggerSuccess:
  1331. subscribersMutexes[i].Unlock()
  1332. }
  1333. subscribersTimeMutexes[i].Unlock()
  1334. },
  1335. })
  1336. }
  1337. // 32
  1338. if topic == masterConfig.TopicOfTprouteplan && (len(masterConfig.RuleOfTprouteplan1) > 0 || len(masterConfig.RuleOfTprouteplan2) > 0) {
  1339. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1340. Node: commonConfig.RosNode,
  1341. Topic: topic,
  1342. Callback: func(data *pjisuv_msgs.RoutePlan) {
  1343. subscribersTimeMutexes[i].Lock()
  1344. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1345. subscribersMutexes[i].Lock()
  1346. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1347. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1348. faultLabel := ""
  1349. if len(masterConfig.RuleOfTprouteplan1) > 0 {
  1350. for _, f := range masterConfig.RuleOfTprouteplan1 {
  1351. faultLabel = f(data)
  1352. if faultLabel != "" {
  1353. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1354. subscribersTimes[i] = time.Now()
  1355. goto TriggerSuccess
  1356. }
  1357. }
  1358. }
  1359. if len(masterConfig.RuleOfTprouteplan2) > 0 {
  1360. for _, f := range masterConfig.RuleOfTprouteplan2 {
  1361. faultLabel = f(data, &pjisuvParam)
  1362. if faultLabel != "" {
  1363. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1364. subscribersTimes[i] = time.Now()
  1365. goto TriggerSuccess
  1366. }
  1367. }
  1368. }
  1369. TriggerSuccess:
  1370. subscribersMutexes[i].Unlock()
  1371. }
  1372. subscribersTimeMutexes[i].Unlock()
  1373. },
  1374. })
  1375. }
  1376. // 33
  1377. if topic == masterConfig.TopicOfTrajectoryDisplay && (len(masterConfig.RuleOfTrajectoryDisplay1) > 0 || len(masterConfig.RuleOfTrajectoryDisplay2) > 0) {
  1378. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1379. Node: commonConfig.RosNode,
  1380. Topic: topic,
  1381. Callback: func(data *nav_msgs.Path) {
  1382. subscribersTimeMutexes[i].Lock()
  1383. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1384. subscribersMutexes[i].Lock()
  1385. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1386. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1387. faultLabel := ""
  1388. if len(masterConfig.RuleOfTrajectoryDisplay1) > 0 {
  1389. for _, f := range masterConfig.RuleOfTrajectoryDisplay1 {
  1390. faultLabel = f(data)
  1391. if faultLabel != "" {
  1392. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1393. subscribersTimes[i] = time.Now()
  1394. goto TriggerSuccess
  1395. }
  1396. }
  1397. }
  1398. if len(masterConfig.RuleOfTrajectoryDisplay2) > 0 {
  1399. for _, f := range masterConfig.RuleOfTrajectoryDisplay2 {
  1400. faultLabel = f(data, &pjisuvParam)
  1401. if faultLabel != "" {
  1402. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1403. subscribersTimes[i] = time.Now()
  1404. goto TriggerSuccess
  1405. }
  1406. }
  1407. }
  1408. TriggerSuccess:
  1409. subscribersMutexes[i].Unlock()
  1410. }
  1411. subscribersTimeMutexes[i].Unlock()
  1412. },
  1413. })
  1414. }
  1415. // 34
  1416. if topic == masterConfig.TopicOfUngroundCloudpoints && (len(masterConfig.RuleOfUngroundCloudpoints1) > 0 || len(masterConfig.RuleOfUngroundCloudpoints2) > 0) {
  1417. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1418. Node: commonConfig.RosNode,
  1419. Topic: topic,
  1420. Callback: func(data *sensor_msgs.PointCloud2) {
  1421. subscribersTimeMutexes[i].Lock()
  1422. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1423. subscribersMutexes[i].Lock()
  1424. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1425. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1426. faultLabel := ""
  1427. if len(masterConfig.RuleOfUngroundCloudpoints1) > 0 {
  1428. for _, f := range masterConfig.RuleOfUngroundCloudpoints1 {
  1429. faultLabel = f(data)
  1430. if faultLabel != "" {
  1431. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1432. subscribersTimes[i] = time.Now()
  1433. goto TriggerSuccess
  1434. }
  1435. }
  1436. }
  1437. if len(masterConfig.RuleOfUngroundCloudpoints2) > 0 {
  1438. for _, f := range masterConfig.RuleOfUngroundCloudpoints2 {
  1439. faultLabel = f(data, &pjisuvParam)
  1440. if faultLabel != "" {
  1441. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1442. subscribersTimes[i] = time.Now()
  1443. goto TriggerSuccess
  1444. }
  1445. }
  1446. }
  1447. TriggerSuccess:
  1448. subscribersMutexes[i].Unlock()
  1449. }
  1450. subscribersTimeMutexes[i].Unlock()
  1451. },
  1452. })
  1453. }
  1454. // 35
  1455. if topic == masterConfig.TopicOfCameraImage && (len(masterConfig.RuleOfCameraImage1) > 0 || len(masterConfig.RuleOfCameraImage2) > 0) {
  1456. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1457. Node: commonConfig.RosNode,
  1458. Topic: topic,
  1459. Callback: func(data *sensor_msgs.Image) {
  1460. subscribersTimeMutexes[i].Lock()
  1461. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1462. subscribersMutexes[i].Lock()
  1463. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1464. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1465. faultLabel := ""
  1466. if len(masterConfig.RuleOfCameraImage1) > 0 {
  1467. for _, f := range masterConfig.RuleOfCameraImage1 {
  1468. faultLabel = f(data)
  1469. if faultLabel != "" {
  1470. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1471. subscribersTimes[i] = time.Now()
  1472. goto TriggerSuccess
  1473. }
  1474. }
  1475. }
  1476. if len(masterConfig.RuleOfCameraImage2) > 0 {
  1477. for _, f := range masterConfig.RuleOfCameraImage2 {
  1478. faultLabel = f(data, &pjisuvParam)
  1479. if faultLabel != "" {
  1480. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1481. subscribersTimes[i] = time.Now()
  1482. goto TriggerSuccess
  1483. }
  1484. }
  1485. }
  1486. TriggerSuccess:
  1487. subscribersMutexes[i].Unlock()
  1488. }
  1489. subscribersTimeMutexes[i].Unlock()
  1490. },
  1491. })
  1492. }
  1493. // 36
  1494. if topic == masterConfig.TopicOfDataRead && (len(masterConfig.RuleOfDataRead1) > 0 || len(masterConfig.RuleOfDataRead2) > 0) {
  1495. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1496. Node: commonConfig.RosNode,
  1497. Topic: topic,
  1498. Callback: func(data *pjisuv_msgs.Retrieval) {
  1499. // 更新共享变量
  1500. mutexOfDataRead.RLock()
  1501. {
  1502. pjisuvParam.NumCountDataReadOfDataRead++
  1503. if pjisuvParam.NumCountDataReadOfDataRead == 10 {
  1504. pjisuvParam.EgoSteeringRealOfDataRead = append(pjisuvParam.EgoSteeringRealOfDataRead, data.ActStrWhAng)
  1505. pjisuvParam.EgoThrottleRealOfDataRead = append(pjisuvParam.EgoThrottleRealOfDataRead, data.AccPed2)
  1506. pjisuvParam.NumCountDataReadOfDataRead = 0
  1507. }
  1508. pjisuvParam.StrgAngleRealValueOfDataRead = data.ActStrWhAng
  1509. }
  1510. mutexOfDataRead.RUnlock()
  1511. subscribersTimeMutexes[i].Lock()
  1512. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1513. subscribersMutexes[i].Lock()
  1514. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1515. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1516. faultLabel := ""
  1517. if len(masterConfig.RuleOfDataRead1) > 0 {
  1518. for _, f := range masterConfig.RuleOfDataRead1 {
  1519. faultLabel = f(data)
  1520. if faultLabel != "" {
  1521. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1522. subscribersTimes[i] = time.Now()
  1523. goto TriggerSuccess
  1524. }
  1525. }
  1526. }
  1527. if len(masterConfig.RuleOfDataRead2) > 0 {
  1528. for _, f := range masterConfig.RuleOfDataRead2 {
  1529. faultLabel = f(data, &pjisuvParam)
  1530. if faultLabel != "" {
  1531. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1532. subscribersTimes[i] = time.Now()
  1533. goto TriggerSuccess
  1534. }
  1535. }
  1536. }
  1537. TriggerSuccess:
  1538. subscribersMutexes[i].Unlock()
  1539. }
  1540. subscribersTimeMutexes[i].Unlock()
  1541. },
  1542. })
  1543. }
  1544. // 37
  1545. if topic == masterConfig.TopicOfPjiGps && (len(masterConfig.RuleOfPjiGps1) > 0 || len(masterConfig.RuleOfPjiGps2) > 0) {
  1546. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1547. Node: commonConfig.RosNode,
  1548. Topic: topic,
  1549. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  1550. subscribersTimeMutexes[i].Lock()
  1551. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1552. subscribersMutexes[i].Lock()
  1553. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1554. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1555. faultLabel := ""
  1556. if len(masterConfig.RuleOfPjiGps1) > 0 {
  1557. for _, f := range masterConfig.RuleOfPjiGps1 {
  1558. faultLabel = f(data)
  1559. if faultLabel != "" {
  1560. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1561. subscribersTimes[i] = time.Now()
  1562. goto TriggerSuccess
  1563. }
  1564. }
  1565. }
  1566. if len(masterConfig.RuleOfPjiGps2) > 0 {
  1567. for _, f := range masterConfig.RuleOfPjiGps2 {
  1568. faultLabel = f(data, &pjisuvParam)
  1569. if faultLabel != "" {
  1570. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1571. subscribersTimes[i] = time.Now()
  1572. goto TriggerSuccess
  1573. }
  1574. }
  1575. }
  1576. TriggerSuccess:
  1577. subscribersMutexes[i].Unlock()
  1578. }
  1579. subscribersTimeMutexes[i].Unlock()
  1580. },
  1581. })
  1582. }
  1583. // 39
  1584. if topic == masterConfig.TopicOfPjVehicleFdbPub && (len(masterConfig.RuleOfPjVehicleFdbPub1) > 0 || len(masterConfig.RuleOfPjVehicleFdbPub2) > 0) {
  1585. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1586. Node: commonConfig.RosNode,
  1587. Topic: topic,
  1588. Callback: func(data *pjisuv_msgs.VehicleFdb) {
  1589. subscribersTimeMutexes[i].Lock()
  1590. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1591. subscribersMutexes[i].Lock()
  1592. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1593. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1594. faultLabel := ""
  1595. if len(masterConfig.RuleOfPjVehicleFdbPub1) > 0 {
  1596. for _, f := range masterConfig.RuleOfPjVehicleFdbPub1 {
  1597. faultLabel = f(data)
  1598. if faultLabel != "" {
  1599. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1600. subscribersTimes[i] = time.Now()
  1601. goto TriggerSuccess
  1602. }
  1603. }
  1604. }
  1605. if len(masterConfig.RuleOfPjVehicleFdbPub2) > 0 {
  1606. for _, f := range masterConfig.RuleOfPjVehicleFdbPub2 {
  1607. faultLabel = f(data, &pjisuvParam)
  1608. if faultLabel != "" {
  1609. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1610. subscribersTimes[i] = time.Now()
  1611. goto TriggerSuccess
  1612. }
  1613. }
  1614. }
  1615. TriggerSuccess:
  1616. subscribersMutexes[i].Unlock()
  1617. }
  1618. subscribersTimeMutexes[i].Unlock()
  1619. },
  1620. })
  1621. }
  1622. if err != nil {
  1623. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  1624. continue
  1625. }
  1626. }
  1627. select {
  1628. case signal := <-service.ChannelKillWindowProducer:
  1629. if signal == 1 {
  1630. commonConfig.RosNode.Close()
  1631. service.AddKillTimes("3")
  1632. return
  1633. }
  1634. }
  1635. }
  1636. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  1637. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1638. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) { // 如果是不在旧故障窗口内,添加一个新窗口
  1639. exceptBegin := util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime)
  1640. finalTimeWindowBegin := ""
  1641. if util.TimeCustom1LessEqualThanTimeCustom2(exceptBegin, latestTimeWindowEnd) { // 窗口最早时间不能早于上一个窗口结束时间
  1642. finalTimeWindowBegin = latestTimeWindowEnd
  1643. } else {
  1644. finalTimeWindowBegin = exceptBegin
  1645. }
  1646. latestTimeWindowEnd = util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1647. newTimeWindow := commonEntity.TimeWindow{
  1648. FaultTime: faultHappenTime,
  1649. TimeWindowBegin: finalTimeWindowBegin,
  1650. TimeWindowEnd: latestTimeWindowEnd,
  1651. Length: util.CalculateDifferenceOfTimeCustom(finalTimeWindowBegin, latestTimeWindowEnd),
  1652. Labels: []string{faultLabel},
  1653. MasterTopics: masterTopics,
  1654. SlaveTopics: slaveTopics,
  1655. }
  1656. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1657. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1658. } else { // 如果在旧故障窗口内
  1659. commonEntity.TimeWindowProducerQueueMutex.RLock()
  1660. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  1661. // 更新故障窗口end时间
  1662. latestEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime) // 窗口最晚关闭时间是窗口开启时间加上最大任务时间
  1663. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime) // 窗口期望关闭时间是触发时间加上后置时间
  1664. if util.TimeCustom1GreaterTimeCustom2(expectEnd, latestEnd) {
  1665. latestTimeWindowEnd = latestEnd
  1666. lastTimeWindow.TimeWindowEnd = latestTimeWindowEnd
  1667. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, lastTimeWindow.TimeWindowEnd)
  1668. } else {
  1669. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  1670. latestTimeWindowEnd = expectEnd
  1671. lastTimeWindow.TimeWindowEnd = latestTimeWindowEnd
  1672. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, lastTimeWindow.TimeWindowEnd)
  1673. }
  1674. }
  1675. // 更新label
  1676. labels := lastTimeWindow.Labels
  1677. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1678. // 更新 topic
  1679. sourceMasterTopics := lastTimeWindow.MasterTopics
  1680. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1681. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1682. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1683. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1684. }
  1685. }
  1686. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1687. // 获取所有需要采集的topic
  1688. var faultCodeTopics []string
  1689. for _, code := range commonConfig.CloudConfig.Triggers {
  1690. if code.Label == faultLabel {
  1691. faultCodeTopics = code.Topics
  1692. }
  1693. }
  1694. // 根据不同节点采集的topic进行分配采集
  1695. for _, acceptTopic := range faultCodeTopics {
  1696. for _, host := range commonConfig.CloudConfig.Hosts {
  1697. for _, topic := range host.Topics {
  1698. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1699. masterTopics = append(masterTopics, acceptTopic)
  1700. }
  1701. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1702. slaveTopics = append(slaveTopics, acceptTopic)
  1703. }
  1704. }
  1705. }
  1706. }
  1707. return masterTopics, slaveTopics
  1708. }