produce_window.go 45 KB


  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "github.com/bluenviron/goroslib/v2"
  11. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  16. "math"
  17. "sync"
  18. "time"
  19. )
  20. // 所有共享变量
  21. var (
  22. pjisuvParam commonEntity.PjisuvParam
  23. // /cicv_location
  24. mutexOfCicvLocation sync.RWMutex
  25. // /tpperception
  26. mutexOfTpperception sync.RWMutex
  27. // /pj_control_pub
  28. mutexOfPjControlPub sync.RWMutex
  29. // /data_read
  30. mutexOfDataRead sync.RWMutex
  31. // /pj_vehicle_fdb_pub
  32. mutexOfPjVehicleFdbPub sync.RWMutex
  33. )
  34. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  35. func PrepareTimeWindowProducerQueue() {
  36. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  37. var err error
  38. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  39. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  40. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  41. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  42. for i, topic := range commonConfig.SubscribeTopics {
  43. // !!!扩展的定时任务监听,牛逼的设计!!!扩展性拉满啦
  44. if topic == masterConfig.TopicOfCicvExtend {
  45. go func() {
  46. for {
  47. time.Sleep(time.Duration(3500) * time.Millisecond)
  48. for _, f := range masterConfig.RuleOfCicvExtend {
  49. label := f(pjisuvParam)
  50. if label != "" {
  51. saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
  52. break
  53. }
  54. }
  55. }
  56. }()
  57. }
  58. // 其他常规监听器
  59. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  60. // 1
  61. if topic == masterConfig.TopicOfAmrPose && len(masterConfig.RuleOfAmrPose) > 0 {
  62. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  63. Node: commonConfig.RosNode,
  64. Topic: topic,
  65. Callback: func(data *visualization_msgs.MarkerArray) {
  66. subscribersTimeMutexes[i].Lock()
  67. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  68. subscribersMutexes[i].Lock()
  69. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  70. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  71. var faultLabel string
  72. for _, f := range masterConfig.RuleOfAmrPose {
  73. faultLabel = f(data)
  74. if faultLabel != "" {
  75. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  76. subscribersTimes[i] = time.Now()
  77. break
  78. }
  79. }
  80. subscribersMutexes[i].Unlock()
  81. }
  82. subscribersTimeMutexes[i].Unlock()
  83. },
  84. })
  85. }
  86. // 2
  87. if topic == masterConfig.TopicOfBoundingBoxesFast && len(masterConfig.RuleOfBoundingBoxesFast) > 0 {
  88. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  89. Node: commonConfig.RosNode,
  90. Topic: topic,
  91. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  92. subscribersTimeMutexes[i].Lock()
  93. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  94. subscribersMutexes[i].Lock()
  95. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  96. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  97. var faultLabel string
  98. for _, f := range masterConfig.RuleOfBoundingBoxesFast {
  99. faultLabel = f(data)
  100. if faultLabel != "" {
  101. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  102. subscribersTimes[i] = time.Now()
  103. break
  104. }
  105. }
  106. subscribersMutexes[i].Unlock()
  107. }
  108. subscribersTimeMutexes[i].Unlock()
  109. },
  110. })
  111. }
  112. // 3
  113. if topic == masterConfig.TopicOfCameraFault && len(masterConfig.RuleOfCameraFault) > 0 {
  114. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  115. Node: commonConfig.RosNode,
  116. Topic: topic,
  117. Callback: func(data *pjisuv_msgs.FaultVec) {
  118. subscribersTimeMutexes[i].Lock()
  119. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  120. subscribersMutexes[i].Lock()
  121. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  122. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  123. var faultLabel string
  124. for _, f := range masterConfig.RuleOfCameraFault {
  125. faultLabel = f(data)
  126. if faultLabel != "" {
  127. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  128. subscribersTimes[i] = time.Now()
  129. break
  130. }
  131. }
  132. subscribersMutexes[i].Unlock()
  133. }
  134. subscribersTimeMutexes[i].Unlock()
  135. },
  136. })
  137. }
  138. // 4
  139. if topic == masterConfig.TopicOfCanData && len(masterConfig.RuleOfCanData) > 0 {
  140. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  141. Node: commonConfig.RosNode,
  142. Topic: topic,
  143. Callback: func(data *pjisuv_msgs.Frame) {
  144. subscribersTimeMutexes[i].Lock()
  145. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  146. subscribersMutexes[i].Lock()
  147. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  148. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  149. var faultLabel string
  150. for _, f := range masterConfig.RuleOfCanData {
  151. faultLabel = f(data)
  152. if faultLabel != "" {
  153. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  154. subscribersTimes[i] = time.Now()
  155. break
  156. }
  157. }
  158. subscribersMutexes[i].Unlock()
  159. }
  160. subscribersTimeMutexes[i].Unlock()
  161. },
  162. })
  163. }
  164. // 5
  165. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud && len(masterConfig.RuleOfCh128x1LslidarPointCloud) > 0 {
  166. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  167. Node: commonConfig.RosNode,
  168. Topic: topic,
  169. Callback: func(data *sensor_msgs.PointCloud2) {
  170. subscribersTimeMutexes[i].Lock()
  171. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  172. subscribersMutexes[i].Lock()
  173. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  174. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  175. var faultLabel string
  176. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud {
  177. faultLabel = f(data)
  178. if faultLabel != "" {
  179. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  180. subscribersTimes[i] = time.Now()
  181. break
  182. }
  183. }
  184. subscribersMutexes[i].Unlock()
  185. }
  186. subscribersTimeMutexes[i].Unlock()
  187. },
  188. })
  189. }
  190. // 6
  191. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud && len(masterConfig.RuleOfCh64wLLslidarPointCloud) > 0 {
  192. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  193. Node: commonConfig.RosNode,
  194. Topic: topic,
  195. Callback: func(data *sensor_msgs.PointCloud2) {
  196. subscribersTimeMutexes[i].Lock()
  197. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  198. subscribersMutexes[i].Lock()
  199. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  200. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  201. var faultLabel string
  202. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud {
  203. faultLabel = f(data)
  204. if faultLabel != "" {
  205. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  206. subscribersTimes[i] = time.Now()
  207. break
  208. }
  209. }
  210. subscribersMutexes[i].Unlock()
  211. }
  212. subscribersTimeMutexes[i].Unlock()
  213. },
  214. })
  215. }
  216. // 7
  217. if topic == masterConfig.TopicOfCh64wLScan && len(masterConfig.RuleOfCh64wLScan) > 0 {
  218. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  219. Node: commonConfig.RosNode,
  220. Topic: topic,
  221. Callback: func(data *sensor_msgs.LaserScan) {
  222. subscribersTimeMutexes[i].Lock()
  223. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  224. subscribersMutexes[i].Lock()
  225. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  226. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  227. var faultLabel string
  228. for _, f := range masterConfig.RuleOfCh64wLScan {
  229. faultLabel = f(data)
  230. if faultLabel != "" {
  231. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  232. subscribersTimes[i] = time.Now()
  233. break
  234. }
  235. }
  236. subscribersMutexes[i].Unlock()
  237. }
  238. subscribersTimeMutexes[i].Unlock()
  239. },
  240. })
  241. }
  242. // 8
  243. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud && len(masterConfig.RuleOfCh64wRLslidarPointCloud) > 0 {
  244. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  245. Node: commonConfig.RosNode,
  246. Topic: topic,
  247. Callback: func(data *sensor_msgs.PointCloud2) {
  248. subscribersTimeMutexes[i].Lock()
  249. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  250. subscribersMutexes[i].Lock()
  251. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  252. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  253. var faultLabel string
  254. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud {
  255. faultLabel = f(data)
  256. if faultLabel != "" {
  257. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  258. subscribersTimes[i] = time.Now()
  259. break
  260. }
  261. }
  262. subscribersMutexes[i].Unlock()
  263. }
  264. subscribersTimeMutexes[i].Unlock()
  265. },
  266. })
  267. }
  268. // 9
  269. if topic == masterConfig.TopicOfCh64wRScan && len(masterConfig.RuleOfCh64wRScan) > 0 {
  270. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  271. Node: commonConfig.RosNode,
  272. Topic: topic,
  273. Callback: func(data *sensor_msgs.LaserScan) {
  274. subscribersTimeMutexes[i].Lock()
  275. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  276. subscribersMutexes[i].Lock()
  277. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  278. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  279. var faultLabel string
  280. for _, f := range masterConfig.RuleOfCh64wRScan {
  281. faultLabel = f(data)
  282. if faultLabel != "" {
  283. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  284. subscribersTimes[i] = time.Now()
  285. break
  286. }
  287. }
  288. subscribersMutexes[i].Unlock()
  289. }
  290. subscribersTimeMutexes[i].Unlock()
  291. },
  292. })
  293. }
  294. // 10
  295. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects && len(masterConfig.RuleOfCicvLidarclusterMovingObjects) > 0 {
  296. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  297. Node: commonConfig.RosNode,
  298. Topic: topic,
  299. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  300. subscribersTimeMutexes[i].Lock()
  301. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  302. subscribersMutexes[i].Lock()
  303. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  304. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  305. var faultLabel string
  306. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects {
  307. faultLabel = f(data)
  308. if faultLabel != "" {
  309. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  310. subscribersTimes[i] = time.Now()
  311. break
  312. }
  313. }
  314. subscribersMutexes[i].Unlock()
  315. }
  316. subscribersTimeMutexes[i].Unlock()
  317. },
  318. })
  319. }
  320. // 11
  321. if topic == masterConfig.TopicOfCicvAmrTrajectory && len(masterConfig.RuleOfCicvAmrTrajectory) > 0 {
  322. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  323. Node: commonConfig.RosNode,
  324. Topic: topic,
  325. Callback: func(data *pjisuv_msgs.Trajectory) {
  326. subscribersTimeMutexes[i].Lock()
  327. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  328. subscribersMutexes[i].Lock()
  329. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  330. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  331. var faultLabel string
  332. for _, f := range masterConfig.RuleOfCicvAmrTrajectory {
  333. faultLabel = f(data)
  334. if faultLabel != "" {
  335. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  336. subscribersTimes[i] = time.Now()
  337. break
  338. }
  339. }
  340. subscribersMutexes[i].Unlock()
  341. }
  342. subscribersTimeMutexes[i].Unlock()
  343. },
  344. })
  345. }
  346. // 12
  347. if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
  348. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  349. Node: commonConfig.RosNode,
  350. Topic: topic,
  351. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  352. // 更新共享变量
  353. mutexOfCicvLocation.RLock()
  354. {
  355. pjisuvParam.VelocityYOfCicvLocation = data.VelocityX
  356. pjisuvParam.VelocityYOfCicvLocation = data.VelocityY
  357. pjisuvParam.VelocityYOfCicvLocation = data.VelocityZ
  358. pjisuvParam.YawOfCicvLocation = data.Yaw
  359. pjisuvParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
  360. pjisuvParam.PositionXOfCicvLocation = data.PositionX
  361. pjisuvParam.PositionYOfCicvLocation = data.PositionY
  362. }
  363. mutexOfCicvLocation.RUnlock()
  364. subscribersTimeMutexes[i].Lock()
  365. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  366. subscribersMutexes[i].Lock()
  367. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  368. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  369. var faultLabel string
  370. for _, f := range masterConfig.RuleOfCicvLocation {
  371. faultLabel = f(data)
  372. if faultLabel != "" {
  373. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  374. break
  375. }
  376. }
  377. subscribersMutexes[i].Unlock()
  378. }
  379. subscribersTimeMutexes[i].Unlock()
  380. },
  381. })
  382. }
  383. // 13
  384. if topic == masterConfig.TopicOfCloudClusters && len(masterConfig.RuleOfCloudClusters) > 0 {
  385. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  386. Node: commonConfig.RosNode,
  387. Topic: topic,
  388. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  389. subscribersTimeMutexes[i].Lock()
  390. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  391. subscribersMutexes[i].Lock()
  392. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  393. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  394. var faultLabel string
  395. for _, f := range masterConfig.RuleOfCloudClusters {
  396. faultLabel = f(data)
  397. if faultLabel != "" {
  398. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  399. subscribersTimes[i] = time.Now()
  400. break
  401. }
  402. }
  403. subscribersMutexes[i].Unlock()
  404. }
  405. subscribersTimeMutexes[i].Unlock()
  406. },
  407. })
  408. }
  409. // 14
  410. if topic == masterConfig.TopicOfHeartbeatInfo && len(masterConfig.RuleOfHeartbeatInfo) > 0 {
  411. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  412. Node: commonConfig.RosNode,
  413. Topic: topic,
  414. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  415. subscribersTimeMutexes[i].Lock()
  416. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  417. subscribersMutexes[i].Lock()
  418. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  419. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  420. var faultLabel string
  421. for _, f := range masterConfig.RuleOfHeartbeatInfo {
  422. faultLabel = f(data)
  423. if faultLabel != "" {
  424. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  425. subscribersTimes[i] = time.Now()
  426. break
  427. }
  428. }
  429. subscribersMutexes[i].Unlock()
  430. }
  431. subscribersTimeMutexes[i].Unlock()
  432. },
  433. })
  434. }
  435. // 15
  436. if topic == masterConfig.TopicOfLidarPretreatmentCost && len(masterConfig.RuleOfLidarPretreatmentCost) > 0 {
  437. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  438. Node: commonConfig.RosNode,
  439. Topic: topic,
  440. Callback: func(data *geometry_msgs.Vector3Stamped) {
  441. subscribersTimeMutexes[i].Lock()
  442. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  443. subscribersMutexes[i].Lock()
  444. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  445. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  446. var faultLabel string
  447. for _, f := range masterConfig.RuleOfLidarPretreatmentCost {
  448. faultLabel = f(data)
  449. if faultLabel != "" {
  450. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  451. subscribersTimes[i] = time.Now()
  452. break
  453. }
  454. }
  455. subscribersMutexes[i].Unlock()
  456. }
  457. subscribersTimeMutexes[i].Unlock()
  458. },
  459. })
  460. }
  461. // 16
  462. if topic == masterConfig.TopicOfLidarPretreatmentOdometry && len(masterConfig.RuleOfLidarPretreatmentOdometry) > 0 {
  463. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  464. Node: commonConfig.RosNode,
  465. Topic: topic,
  466. Callback: func(data *nav_msgs.Odometry) {
  467. subscribersTimeMutexes[i].Lock()
  468. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  469. subscribersMutexes[i].Lock()
  470. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  471. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  472. var faultLabel string
  473. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry {
  474. faultLabel = f(data)
  475. if faultLabel != "" {
  476. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  477. subscribersTimes[i] = time.Now()
  478. break
  479. }
  480. }
  481. subscribersMutexes[i].Unlock()
  482. }
  483. subscribersTimeMutexes[i].Unlock()
  484. },
  485. })
  486. }
  487. // 17
  488. if topic == masterConfig.TopicOfLidarRoi && len(masterConfig.RuleOfLidarRoi) > 0 {
  489. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  490. Node: commonConfig.RosNode,
  491. Topic: topic,
  492. Callback: func(data *geometry_msgs.PolygonStamped) {
  493. subscribersTimeMutexes[i].Lock()
  494. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  495. subscribersMutexes[i].Lock()
  496. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  497. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  498. var faultLabel string
  499. for _, f := range masterConfig.RuleOfLidarRoi {
  500. faultLabel = f(data)
  501. if faultLabel != "" {
  502. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  503. subscribersTimes[i] = time.Now()
  504. break
  505. }
  506. }
  507. subscribersMutexes[i].Unlock()
  508. }
  509. subscribersTimeMutexes[i].Unlock()
  510. },
  511. })
  512. }
  513. // 18
  514. if topic == masterConfig.TopicOfLine1 && len(masterConfig.RuleOfLine1) > 0 {
  515. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  516. Node: commonConfig.RosNode,
  517. Topic: topic,
  518. Callback: func(data *nav_msgs.Path) {
  519. subscribersTimeMutexes[i].Lock()
  520. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  521. subscribersMutexes[i].Lock()
  522. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  523. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  524. var faultLabel string
  525. for _, f := range masterConfig.RuleOfLine1 {
  526. faultLabel = f(data)
  527. if faultLabel != "" {
  528. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  529. subscribersTimes[i] = time.Now()
  530. break
  531. }
  532. }
  533. subscribersMutexes[i].Unlock()
  534. }
  535. subscribersTimeMutexes[i].Unlock()
  536. },
  537. })
  538. }
  539. // 19
  540. if topic == masterConfig.TopicOfLine2 && len(masterConfig.RuleOfLine2) > 0 {
  541. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  542. Node: commonConfig.RosNode,
  543. Topic: topic,
  544. Callback: func(data *nav_msgs.Path) {
  545. subscribersTimeMutexes[i].Lock()
  546. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  547. subscribersMutexes[i].Lock()
  548. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  549. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  550. var faultLabel string
  551. for _, f := range masterConfig.RuleOfLine2 {
  552. faultLabel = f(data)
  553. if faultLabel != "" {
  554. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  555. subscribersTimes[i] = time.Now()
  556. break
  557. }
  558. }
  559. subscribersMutexes[i].Unlock()
  560. }
  561. subscribersTimeMutexes[i].Unlock()
  562. },
  563. })
  564. }
  565. // 20
  566. if topic == masterConfig.TopicOfMapPolygon && len(masterConfig.RuleOfMapPolygon) > 0 {
  567. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  568. Node: commonConfig.RosNode,
  569. Topic: topic,
  570. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  571. subscribersTimeMutexes[i].Lock()
  572. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  573. subscribersMutexes[i].Lock()
  574. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  575. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  576. var faultLabel string
  577. for _, f := range masterConfig.RuleOfMapPolygon {
  578. faultLabel = f(data)
  579. if faultLabel != "" {
  580. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  581. subscribersTimes[i] = time.Now()
  582. break
  583. }
  584. }
  585. subscribersMutexes[i].Unlock()
  586. }
  587. subscribersTimeMutexes[i].Unlock()
  588. },
  589. })
  590. }
  591. // 21
  592. if topic == masterConfig.TopicOfObstacleDisplay && len(masterConfig.RuleOfObstacleDisplay) > 0 {
  593. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  594. Node: commonConfig.RosNode,
  595. Topic: topic,
  596. Callback: func(data *visualization_msgs.MarkerArray) {
  597. subscribersTimeMutexes[i].Lock()
  598. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  599. subscribersMutexes[i].Lock()
  600. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  601. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  602. var faultLabel string
  603. for _, f := range masterConfig.RuleOfObstacleDisplay {
  604. faultLabel = f(data)
  605. if faultLabel != "" {
  606. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  607. subscribersTimes[i] = time.Now()
  608. break
  609. }
  610. }
  611. subscribersMutexes[i].Unlock()
  612. }
  613. subscribersTimeMutexes[i].Unlock()
  614. },
  615. })
  616. }
  617. // 22
  618. if topic == masterConfig.TopicOfPjControlPub && len(masterConfig.RuleOfPjControlPub) > 0 {
  619. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  620. Node: commonConfig.RosNode,
  621. Topic: topic,
  622. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  623. // 更新共享变量
  624. mutexOfPjControlPub.RLock()
  625. {
  626. pjisuvParam.NumCountPjiControlCommandOfPjControlPub++
  627. if pjisuvParam.NumCountPjiControlCommandOfPjControlPub == 10 {
  628. pjisuvParam.EgoSteeringCmdOfPjControlPub = append(pjisuvParam.EgoSteeringCmdOfPjControlPub, data.ICPVCmdStrAngle)
  629. pjisuvParam.EgoThrottleCmdOfPjControlPub = append(pjisuvParam.EgoThrottleCmdOfPjControlPub, data.ICPVCmdAccPelPosAct)
  630. pjisuvParam.NumCountPjiControlCommandOfPjControlPub = 0
  631. }
  632. }
  633. mutexOfPjControlPub.RUnlock()
  634. subscribersTimeMutexes[i].Lock()
  635. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  636. subscribersMutexes[i].Lock()
  637. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  638. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  639. var faultLabel string
  640. for _, f := range masterConfig.RuleOfPjControlPub {
  641. faultLabel = f(data)
  642. if faultLabel != "" {
  643. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  644. subscribersTimes[i] = time.Now()
  645. break
  646. }
  647. }
  648. subscribersMutexes[i].Unlock()
  649. }
  650. subscribersTimeMutexes[i].Unlock()
  651. },
  652. })
  653. }
  654. // 23
  655. if topic == masterConfig.TopicOfPointsCluster && len(masterConfig.RuleOfPointsCluster) > 0 {
  656. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  657. Node: commonConfig.RosNode,
  658. Topic: topic,
  659. Callback: func(data *sensor_msgs.PointCloud2) {
  660. subscribersTimeMutexes[i].Lock()
  661. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  662. subscribersMutexes[i].Lock()
  663. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  664. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  665. var faultLabel string
  666. for _, f := range masterConfig.RuleOfPointsCluster {
  667. faultLabel = f(data)
  668. if faultLabel != "" {
  669. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  670. subscribersTimes[i] = time.Now()
  671. break
  672. }
  673. }
  674. subscribersMutexes[i].Unlock()
  675. }
  676. subscribersTimeMutexes[i].Unlock()
  677. },
  678. })
  679. }
  680. // 24
  681. if topic == masterConfig.TopicOfPointsConcat && len(masterConfig.RuleOfPointsConcat) > 0 {
  682. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  683. Node: commonConfig.RosNode,
  684. Topic: topic,
  685. Callback: func(data *sensor_msgs.PointCloud2) {
  686. subscribersTimeMutexes[i].Lock()
  687. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  688. subscribersMutexes[i].Lock()
  689. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  690. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  691. var faultLabel string
  692. for _, f := range masterConfig.RuleOfPointsConcat {
  693. faultLabel = f(data)
  694. if faultLabel != "" {
  695. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  696. subscribersTimes[i] = time.Now()
  697. break
  698. }
  699. }
  700. subscribersMutexes[i].Unlock()
  701. }
  702. subscribersTimeMutexes[i].Unlock()
  703. },
  704. })
  705. }
  706. // 25
  707. if topic == masterConfig.TopicOfReferenceDisplay && len(masterConfig.RuleOfReferenceDisplay) > 0 {
  708. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  709. Node: commonConfig.RosNode,
  710. Topic: topic,
  711. Callback: func(data *nav_msgs.Path) {
  712. subscribersTimeMutexes[i].Lock()
  713. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  714. subscribersMutexes[i].Lock()
  715. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  716. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  717. var faultLabel string
  718. for _, f := range masterConfig.RuleOfReferenceDisplay {
  719. faultLabel = f(data)
  720. if faultLabel != "" {
  721. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  722. subscribersTimes[i] = time.Now()
  723. break
  724. }
  725. }
  726. subscribersMutexes[i].Unlock()
  727. }
  728. subscribersTimeMutexes[i].Unlock()
  729. },
  730. })
  731. }
  732. // 26
  733. if topic == masterConfig.TopicOfReferenceTrajectory && len(masterConfig.RuleOfReferenceTrajectory) > 0 {
  734. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  735. Node: commonConfig.RosNode,
  736. Topic: topic,
  737. Callback: func(data *pjisuv_msgs.Trajectory) {
  738. subscribersTimeMutexes[i].Lock()
  739. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  740. subscribersMutexes[i].Lock()
  741. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  742. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  743. var faultLabel string
  744. for _, f := range masterConfig.RuleOfReferenceTrajectory {
  745. faultLabel = f(data)
  746. if faultLabel != "" {
  747. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  748. subscribersTimes[i] = time.Now()
  749. break
  750. }
  751. }
  752. subscribersMutexes[i].Unlock()
  753. }
  754. subscribersTimeMutexes[i].Unlock()
  755. },
  756. })
  757. }
  758. // 27
  759. if topic == masterConfig.TopicOfRoiPoints && len(masterConfig.RuleOfRoiPoints) > 0 {
  760. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  761. Node: commonConfig.RosNode,
  762. Topic: topic,
  763. Callback: func(data *sensor_msgs.PointCloud2) {
  764. subscribersTimeMutexes[i].Lock()
  765. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  766. subscribersMutexes[i].Lock()
  767. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  768. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  769. var faultLabel string
  770. for _, f := range masterConfig.RuleOfRoiPoints {
  771. faultLabel = f(data)
  772. if faultLabel != "" {
  773. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  774. subscribersTimes[i] = time.Now()
  775. break
  776. }
  777. }
  778. subscribersMutexes[i].Unlock()
  779. }
  780. subscribersTimeMutexes[i].Unlock()
  781. },
  782. })
  783. }
  784. // 28
  785. if topic == masterConfig.TopicOfRoiPolygon && len(masterConfig.RuleOfRoiPolygon) > 0 {
  786. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  787. Node: commonConfig.RosNode,
  788. Topic: topic,
  789. Callback: func(data *nav_msgs.Path) {
  790. subscribersTimeMutexes[i].Lock()
  791. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  792. subscribersMutexes[i].Lock()
  793. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  794. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  795. var faultLabel string
  796. for _, f := range masterConfig.RuleOfRoiPolygon {
  797. faultLabel = f(data)
  798. if faultLabel != "" {
  799. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  800. subscribersTimes[i] = time.Now()
  801. break
  802. }
  803. }
  804. subscribersMutexes[i].Unlock()
  805. }
  806. subscribersTimeMutexes[i].Unlock()
  807. },
  808. })
  809. }
  810. // 29
  811. if topic == masterConfig.TopicOfTf && len(masterConfig.RuleOfTf) > 0 {
  812. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  813. Node: commonConfig.RosNode,
  814. Topic: topic,
  815. Callback: func(data *tf2_msgs.TFMessage) {
  816. subscribersTimeMutexes[i].Lock()
  817. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  818. subscribersMutexes[i].Lock()
  819. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  820. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  821. var faultLabel string
  822. for _, f := range masterConfig.RuleOfTf {
  823. faultLabel = f(data)
  824. if faultLabel != "" {
  825. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  826. subscribersTimes[i] = time.Now()
  827. break
  828. }
  829. }
  830. subscribersMutexes[i].Unlock()
  831. }
  832. subscribersTimeMutexes[i].Unlock()
  833. },
  834. })
  835. }
  836. // 30
  837. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  838. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  839. Node: commonConfig.RosNode,
  840. Topic: topic,
  841. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  842. // 更新共享变量
  843. mutexOfTpperception.RLock()
  844. {
  845. for _, obj := range data.Objs {
  846. if obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  847. continue
  848. }
  849. // 检查 ObjDicOfTpperception 是否为 nil,如果是,则初始化它
  850. if pjisuvParam.ObjDicOfTpperception == nil {
  851. pjisuvParam.ObjDicOfTpperception = make(map[uint32][]float32)
  852. }
  853. if _, ok := pjisuvParam.ObjDicOfTpperception[obj.Id]; !ok {
  854. pjisuvParam.ObjDicOfTpperception[obj.Id] = []float32{}
  855. }
  856. pjisuvParam.ObjDicOfTpperception[obj.Id] = append(pjisuvParam.ObjDicOfTpperception[obj.Id], obj.Y)
  857. }
  858. }
  859. mutexOfTpperception.RUnlock()
  860. subscribersTimeMutexes[i].Lock()
  861. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  862. subscribersMutexes[i].Lock()
  863. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  864. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  865. var faultLabel string
  866. for _, f := range masterConfig.RuleOfTpperception {
  867. faultLabel = f(data, pjisuvParam)
  868. if faultLabel != "" {
  869. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  870. break
  871. }
  872. }
  873. subscribersMutexes[i].Unlock()
  874. }
  875. subscribersTimeMutexes[i].Unlock()
  876. // -------- 触发后更新共享变量
  877. mutexOfTpperception.RLock()
  878. {
  879. for _, obj := range data.Objs {
  880. pjisuvParam.ObjTypeDicOfTpperception[obj.Id] = obj.Type
  881. pjisuvParam.ObjSpeedDicOfTpperception[obj.Id] = math.Pow(math.Pow(float64(obj.Vxabs), 2)+math.Pow(float64(obj.Vyabs), 2), 0.5)
  882. }
  883. }
  884. mutexOfTpperception.RUnlock()
  885. },
  886. })
  887. }
  888. // 31
  889. if topic == masterConfig.TopicOfTpperceptionVis && len(masterConfig.RuleOfTpperceptionVis) > 0 {
  890. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  891. Node: commonConfig.RosNode,
  892. Topic: topic,
  893. Callback: func(data *visualization_msgs.MarkerArray) {
  894. subscribersTimeMutexes[i].Lock()
  895. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  896. subscribersMutexes[i].Lock()
  897. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  898. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  899. var faultLabel string
  900. for _, f := range masterConfig.RuleOfTpperceptionVis {
  901. faultLabel = f(data)
  902. if faultLabel != "" {
  903. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  904. subscribersTimes[i] = time.Now()
  905. break
  906. }
  907. }
  908. subscribersMutexes[i].Unlock()
  909. }
  910. subscribersTimeMutexes[i].Unlock()
  911. },
  912. })
  913. }
  914. // 32
  915. if topic == masterConfig.TopicOfTprouteplan && len(masterConfig.RuleOfTprouteplan) > 0 {
  916. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  917. Node: commonConfig.RosNode,
  918. Topic: topic,
  919. Callback: func(data *pjisuv_msgs.RoutePlan) {
  920. subscribersTimeMutexes[i].Lock()
  921. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  922. subscribersMutexes[i].Lock()
  923. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  924. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  925. var faultLabel string
  926. for _, f := range masterConfig.RuleOfTprouteplan {
  927. faultLabel = f(data)
  928. if faultLabel != "" {
  929. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  930. subscribersTimes[i] = time.Now()
  931. break
  932. }
  933. }
  934. subscribersMutexes[i].Unlock()
  935. }
  936. subscribersTimeMutexes[i].Unlock()
  937. },
  938. })
  939. }
  940. // 33
  941. if topic == masterConfig.TopicOfTrajectoryDisplay && len(masterConfig.RuleOfTrajectoryDisplay) > 0 {
  942. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  943. Node: commonConfig.RosNode,
  944. Topic: topic,
  945. Callback: func(data *nav_msgs.Path) {
  946. subscribersTimeMutexes[i].Lock()
  947. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  948. subscribersMutexes[i].Lock()
  949. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  950. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  951. var faultLabel string
  952. for _, f := range masterConfig.RuleOfTrajectoryDisplay {
  953. faultLabel = f(data)
  954. if faultLabel != "" {
  955. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  956. subscribersTimes[i] = time.Now()
  957. break
  958. }
  959. }
  960. subscribersMutexes[i].Unlock()
  961. }
  962. subscribersTimeMutexes[i].Unlock()
  963. },
  964. })
  965. }
  966. // 34
  967. if topic == masterConfig.TopicOfUngroundCloudpoints && len(masterConfig.RuleOfUngroundCloudpoints) > 0 {
  968. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  969. Node: commonConfig.RosNode,
  970. Topic: topic,
  971. Callback: func(data *sensor_msgs.PointCloud2) {
  972. subscribersTimeMutexes[i].Lock()
  973. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  974. subscribersMutexes[i].Lock()
  975. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  976. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  977. var faultLabel string
  978. for _, f := range masterConfig.RuleOfUngroundCloudpoints {
  979. faultLabel = f(data)
  980. if faultLabel != "" {
  981. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  982. subscribersTimes[i] = time.Now()
  983. break
  984. }
  985. }
  986. subscribersMutexes[i].Unlock()
  987. }
  988. subscribersTimeMutexes[i].Unlock()
  989. },
  990. })
  991. }
  992. // 35
  993. if topic == masterConfig.TopicOfCameraImage && len(masterConfig.RuleOfCameraImage) > 0 {
  994. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  995. Node: commonConfig.RosNode,
  996. Topic: topic,
  997. Callback: func(data *sensor_msgs.Image) {
  998. subscribersTimeMutexes[i].Lock()
  999. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1000. subscribersMutexes[i].Lock()
  1001. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1002. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1003. var faultLabel string
  1004. for _, f := range masterConfig.RuleOfCameraImage {
  1005. faultLabel = f(data)
  1006. if faultLabel != "" {
  1007. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1008. subscribersTimes[i] = time.Now()
  1009. break
  1010. }
  1011. }
  1012. subscribersMutexes[i].Unlock()
  1013. }
  1014. subscribersTimeMutexes[i].Unlock()
  1015. },
  1016. })
  1017. }
  1018. // 36
  1019. if topic == masterConfig.TopicOfDataRead && len(masterConfig.RuleOfDataRead) > 0 {
  1020. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1021. Node: commonConfig.RosNode,
  1022. Topic: topic,
  1023. Callback: func(data *pjisuv_msgs.Retrieval) {
  1024. // 更新共享变量
  1025. mutexOfDataRead.RLock()
  1026. {
  1027. pjisuvParam.NumCountDataReadOfDataRead++
  1028. if pjisuvParam.NumCountDataReadOfDataRead == 10 {
  1029. pjisuvParam.EgoSteeringRealOfDataRead = append(pjisuvParam.EgoSteeringRealOfDataRead, data.ActStrWhAng)
  1030. pjisuvParam.EgoThrottleRealOfDataRead = append(pjisuvParam.EgoThrottleRealOfDataRead, data.AccPed2)
  1031. pjisuvParam.NumCountDataReadOfDataRead = 0
  1032. }
  1033. pjisuvParam.StrgAngleRealValueOfDataRead = data.ActStrWhAng
  1034. }
  1035. mutexOfDataRead.RUnlock()
  1036. subscribersTimeMutexes[i].Lock()
  1037. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1038. subscribersMutexes[i].Lock()
  1039. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1040. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1041. var faultLabel string
  1042. for _, f := range masterConfig.RuleOfDataRead {
  1043. faultLabel = f(data)
  1044. if faultLabel != "" {
  1045. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1046. subscribersTimes[i] = time.Now()
  1047. break
  1048. }
  1049. }
  1050. subscribersMutexes[i].Unlock()
  1051. }
  1052. subscribersTimeMutexes[i].Unlock()
  1053. },
  1054. })
  1055. }
  1056. // 37
  1057. if topic == masterConfig.TopicOfPjiGps && len(masterConfig.RuleOfPjiGps) > 0 {
  1058. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1059. Node: commonConfig.RosNode,
  1060. Topic: topic,
  1061. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  1062. subscribersTimeMutexes[i].Lock()
  1063. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1064. subscribersMutexes[i].Lock()
  1065. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1066. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1067. var faultLabel string
  1068. for _, f := range masterConfig.RuleOfPjiGps {
  1069. faultLabel = f(data)
  1070. if faultLabel != "" {
  1071. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1072. subscribersTimes[i] = time.Now()
  1073. break
  1074. }
  1075. }
  1076. subscribersMutexes[i].Unlock()
  1077. }
  1078. subscribersTimeMutexes[i].Unlock()
  1079. },
  1080. })
  1081. }
  1082. // 39
  1083. if topic == masterConfig.TopicOfPjVehicleFdbPub && len(masterConfig.RuleOfPjVehicleFdbPub) > 0 {
  1084. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1085. Node: commonConfig.RosNode,
  1086. Topic: topic,
  1087. Callback: func(data *pjisuv_msgs.VehicleFdb) {
  1088. subscribersTimeMutexes[i].Lock()
  1089. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1090. subscribersMutexes[i].Lock()
  1091. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1092. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1093. var faultLabel string
  1094. for _, f := range masterConfig.RuleOfPjVehicleFdbPub {
  1095. faultLabel = f(data, &pjisuvParam)
  1096. if faultLabel != "" {
  1097. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1098. subscribersTimes[i] = time.Now()
  1099. break
  1100. }
  1101. }
  1102. subscribersMutexes[i].Unlock()
  1103. }
  1104. subscribersTimeMutexes[i].Unlock()
  1105. },
  1106. })
  1107. }
  1108. if err != nil {
  1109. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  1110. //TODO 如何回传日志
  1111. continue
  1112. }
  1113. }
  1114. select {
  1115. case signal := <-service.ChannelKillWindowProducer:
  1116. if signal == 1 {
  1117. commonConfig.RosNode.Close()
  1118. service.AddKillTimes("3")
  1119. return
  1120. }
  1121. }
  1122. }
  1123. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  1124. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1125. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  1126. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  1127. newTimeWindow := commonEntity.TimeWindow{
  1128. FaultTime: faultHappenTime,
  1129. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  1130. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  1131. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  1132. Labels: []string{faultLabel},
  1133. MasterTopics: masterTopics,
  1134. SlaveTopics: slaveTopics,
  1135. }
  1136. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1137. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1138. } else {
  1139. // 2-2 如果在旧故障窗口内
  1140. commonEntity.TimeWindowProducerQueueMutex.RLock()
  1141. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  1142. // 2-2-1 更新故障窗口end时间
  1143. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  1144. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1145. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  1146. lastTimeWindow.TimeWindowEnd = maxEnd
  1147. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  1148. } else {
  1149. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  1150. lastTimeWindow.TimeWindowEnd = expectEnd
  1151. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1152. }
  1153. }
  1154. // 2-2-2 更新label
  1155. labels := lastTimeWindow.Labels
  1156. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1157. // 2-2-3 更新 topic
  1158. sourceMasterTopics := lastTimeWindow.MasterTopics
  1159. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1160. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1161. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1162. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1163. }
  1164. }
  1165. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1166. // 获取所有需要采集的topic
  1167. var faultCodeTopics []string
  1168. for _, code := range commonConfig.CloudConfig.Triggers {
  1169. if code.Label == faultLabel {
  1170. faultCodeTopics = code.Topics
  1171. }
  1172. }
  1173. // 根据不同节点采集的topic进行分配采集
  1174. for _, acceptTopic := range faultCodeTopics {
  1175. for _, host := range commonConfig.CloudConfig.Hosts {
  1176. for _, topic := range host.Topics {
  1177. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1178. masterTopics = append(masterTopics, acceptTopic)
  1179. }
  1180. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1181. slaveTopics = append(slaveTopics, acceptTopic)
  1182. }
  1183. }
  1184. }
  1185. }
  1186. return masterTopics, slaveTopics
  1187. }