produce_window.go 45 KB


  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "github.com/bluenviron/goroslib/v2"
  11. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  16. "math"
  17. "sync"
  18. "time"
  19. )
  20. // 所有共享变量
  21. var (
  22. pjisuvParam commonEntity.PjisuvParam
  23. // /cicv_location
  24. mutexOfCicvLocation sync.RWMutex
  25. // /tpperception
  26. mutexOfTpperception sync.RWMutex
  27. // /pj_control_pub
  28. mutexOfPjControlPub sync.RWMutex
  29. // /data_read
  30. mutexOfDataRead sync.RWMutex
  31. // /pj_vehicle_fdb_pub
  32. mutexOfPjVehicleFdbPub sync.RWMutex
  33. // /pj_vehicle_fdb_pub
  34. mutexOfCicvAmrTrajectory sync.RWMutex
  35. )
  36. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  37. func PrepareTimeWindowProducerQueue() {
  38. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  39. var err error
  40. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  41. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  42. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  43. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  44. for i, topic := range commonConfig.SubscribeTopics {
  45. // !!!扩展的定时任务监听,牛逼的设计!!!扩展性拉满啦
  46. if topic == masterConfig.TopicOfCicvExtend {
  47. go func() {
  48. for {
  49. time.Sleep(time.Duration(3500) * time.Millisecond)
  50. for _, f := range masterConfig.RuleOfCicvExtend {
  51. label := f(pjisuvParam)
  52. if label != "" {
  53. saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
  54. break
  55. }
  56. }
  57. }
  58. }()
  59. }
  60. // 其他常规监听器
  61. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  62. // 1
  63. if topic == masterConfig.TopicOfAmrPose && len(masterConfig.RuleOfAmrPose) > 0 {
  64. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  65. Node: commonConfig.RosNode,
  66. Topic: topic,
  67. Callback: func(data *visualization_msgs.MarkerArray) {
  68. subscribersTimeMutexes[i].Lock()
  69. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  70. subscribersMutexes[i].Lock()
  71. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  72. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  73. var faultLabel string
  74. for _, f := range masterConfig.RuleOfAmrPose {
  75. faultLabel = f(data)
  76. if faultLabel != "" {
  77. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  78. subscribersTimes[i] = time.Now()
  79. break
  80. }
  81. }
  82. subscribersMutexes[i].Unlock()
  83. }
  84. subscribersTimeMutexes[i].Unlock()
  85. },
  86. })
  87. }
  88. // 2
  89. if topic == masterConfig.TopicOfBoundingBoxesFast && len(masterConfig.RuleOfBoundingBoxesFast) > 0 {
  90. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  91. Node: commonConfig.RosNode,
  92. Topic: topic,
  93. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  94. subscribersTimeMutexes[i].Lock()
  95. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  96. subscribersMutexes[i].Lock()
  97. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  98. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  99. var faultLabel string
  100. for _, f := range masterConfig.RuleOfBoundingBoxesFast {
  101. faultLabel = f(data)
  102. if faultLabel != "" {
  103. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  104. subscribersTimes[i] = time.Now()
  105. break
  106. }
  107. }
  108. subscribersMutexes[i].Unlock()
  109. }
  110. subscribersTimeMutexes[i].Unlock()
  111. },
  112. })
  113. }
  114. // 3
  115. if topic == masterConfig.TopicOfCameraFault && len(masterConfig.RuleOfCameraFault) > 0 {
  116. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  117. Node: commonConfig.RosNode,
  118. Topic: topic,
  119. Callback: func(data *pjisuv_msgs.FaultVec) {
  120. subscribersTimeMutexes[i].Lock()
  121. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  122. subscribersMutexes[i].Lock()
  123. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  124. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  125. var faultLabel string
  126. for _, f := range masterConfig.RuleOfCameraFault {
  127. faultLabel = f(data)
  128. if faultLabel != "" {
  129. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  130. subscribersTimes[i] = time.Now()
  131. break
  132. }
  133. }
  134. subscribersMutexes[i].Unlock()
  135. }
  136. subscribersTimeMutexes[i].Unlock()
  137. },
  138. })
  139. }
  140. // 4
  141. if topic == masterConfig.TopicOfCanData && len(masterConfig.RuleOfCanData) > 0 {
  142. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  143. Node: commonConfig.RosNode,
  144. Topic: topic,
  145. Callback: func(data *pjisuv_msgs.Frame) {
  146. subscribersTimeMutexes[i].Lock()
  147. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  148. subscribersMutexes[i].Lock()
  149. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  150. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  151. var faultLabel string
  152. for _, f := range masterConfig.RuleOfCanData {
  153. faultLabel = f(data)
  154. if faultLabel != "" {
  155. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  156. subscribersTimes[i] = time.Now()
  157. break
  158. }
  159. }
  160. subscribersMutexes[i].Unlock()
  161. }
  162. subscribersTimeMutexes[i].Unlock()
  163. },
  164. })
  165. }
  166. // 5
  167. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud && len(masterConfig.RuleOfCh128x1LslidarPointCloud) > 0 {
  168. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  169. Node: commonConfig.RosNode,
  170. Topic: topic,
  171. Callback: func(data *sensor_msgs.PointCloud2) {
  172. subscribersTimeMutexes[i].Lock()
  173. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  174. subscribersMutexes[i].Lock()
  175. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  176. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  177. var faultLabel string
  178. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud {
  179. faultLabel = f(data)
  180. if faultLabel != "" {
  181. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  182. subscribersTimes[i] = time.Now()
  183. break
  184. }
  185. }
  186. subscribersMutexes[i].Unlock()
  187. }
  188. subscribersTimeMutexes[i].Unlock()
  189. },
  190. })
  191. }
  192. // 6
  193. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud && len(masterConfig.RuleOfCh64wLLslidarPointCloud) > 0 {
  194. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  195. Node: commonConfig.RosNode,
  196. Topic: topic,
  197. Callback: func(data *sensor_msgs.PointCloud2) {
  198. subscribersTimeMutexes[i].Lock()
  199. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  200. subscribersMutexes[i].Lock()
  201. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  202. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  203. var faultLabel string
  204. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud {
  205. faultLabel = f(data)
  206. if faultLabel != "" {
  207. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  208. subscribersTimes[i] = time.Now()
  209. break
  210. }
  211. }
  212. subscribersMutexes[i].Unlock()
  213. }
  214. subscribersTimeMutexes[i].Unlock()
  215. },
  216. })
  217. }
  218. // 7
  219. if topic == masterConfig.TopicOfCh64wLScan && len(masterConfig.RuleOfCh64wLScan) > 0 {
  220. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  221. Node: commonConfig.RosNode,
  222. Topic: topic,
  223. Callback: func(data *sensor_msgs.LaserScan) {
  224. subscribersTimeMutexes[i].Lock()
  225. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  226. subscribersMutexes[i].Lock()
  227. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  228. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  229. var faultLabel string
  230. for _, f := range masterConfig.RuleOfCh64wLScan {
  231. faultLabel = f(data)
  232. if faultLabel != "" {
  233. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  234. subscribersTimes[i] = time.Now()
  235. break
  236. }
  237. }
  238. subscribersMutexes[i].Unlock()
  239. }
  240. subscribersTimeMutexes[i].Unlock()
  241. },
  242. })
  243. }
  244. // 8
  245. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud && len(masterConfig.RuleOfCh64wRLslidarPointCloud) > 0 {
  246. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  247. Node: commonConfig.RosNode,
  248. Topic: topic,
  249. Callback: func(data *sensor_msgs.PointCloud2) {
  250. subscribersTimeMutexes[i].Lock()
  251. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  252. subscribersMutexes[i].Lock()
  253. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  254. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  255. var faultLabel string
  256. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud {
  257. faultLabel = f(data)
  258. if faultLabel != "" {
  259. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  260. subscribersTimes[i] = time.Now()
  261. break
  262. }
  263. }
  264. subscribersMutexes[i].Unlock()
  265. }
  266. subscribersTimeMutexes[i].Unlock()
  267. },
  268. })
  269. }
  270. // 9
  271. if topic == masterConfig.TopicOfCh64wRScan && len(masterConfig.RuleOfCh64wRScan) > 0 {
  272. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  273. Node: commonConfig.RosNode,
  274. Topic: topic,
  275. Callback: func(data *sensor_msgs.LaserScan) {
  276. subscribersTimeMutexes[i].Lock()
  277. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  278. subscribersMutexes[i].Lock()
  279. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  280. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  281. var faultLabel string
  282. for _, f := range masterConfig.RuleOfCh64wRScan {
  283. faultLabel = f(data)
  284. if faultLabel != "" {
  285. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  286. subscribersTimes[i] = time.Now()
  287. break
  288. }
  289. }
  290. subscribersMutexes[i].Unlock()
  291. }
  292. subscribersTimeMutexes[i].Unlock()
  293. },
  294. })
  295. }
  296. // 10
  297. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects && len(masterConfig.RuleOfCicvLidarclusterMovingObjects) > 0 {
  298. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  299. Node: commonConfig.RosNode,
  300. Topic: topic,
  301. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  302. subscribersTimeMutexes[i].Lock()
  303. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  304. subscribersMutexes[i].Lock()
  305. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  306. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  307. var faultLabel string
  308. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects {
  309. faultLabel = f(data)
  310. if faultLabel != "" {
  311. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  312. subscribersTimes[i] = time.Now()
  313. break
  314. }
  315. }
  316. subscribersMutexes[i].Unlock()
  317. }
  318. subscribersTimeMutexes[i].Unlock()
  319. },
  320. })
  321. }
  322. // 11
  323. if topic == masterConfig.TopicOfCicvAmrTrajectory && len(masterConfig.RuleOfCicvAmrTrajectory) > 0 {
  324. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  325. Node: commonConfig.RosNode,
  326. Topic: topic,
  327. Callback: func(data *pjisuv_msgs.Trajectory) {
  328. subscribersTimeMutexes[i].Lock()
  329. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  330. subscribersMutexes[i].Lock()
  331. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  332. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  333. var faultLabel string
  334. for _, f := range masterConfig.RuleOfCicvAmrTrajectory {
  335. faultLabel = f(data, pjisuvParam)
  336. if faultLabel != "" {
  337. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  338. subscribersTimes[i] = time.Now()
  339. break
  340. }
  341. }
  342. subscribersMutexes[i].Unlock()
  343. }
  344. subscribersTimeMutexes[i].Unlock()
  345. // 触发后更新共享变量
  346. mutexOfCicvAmrTrajectory.RLock()
  347. {
  348. var currentCurvateres []float64
  349. for _, point := range data.Trajectoryinfo.Trajectorypoints {
  350. currentCurvateres = append(currentCurvateres, math.Abs(float64(point.Curvature)))
  351. }
  352. pjisuvParam.LastCurvaturesOfCicvAmrTrajectory = currentCurvateres
  353. }
  354. mutexOfCicvAmrTrajectory.RUnlock()
  355. },
  356. })
  357. }
  358. // 12
  359. if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
  360. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  361. Node: commonConfig.RosNode,
  362. Topic: topic,
  363. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  364. // 更新共享变量
  365. mutexOfCicvLocation.RLock()
  366. {
  367. pjisuvParam.VelocityYOfCicvLocation = data.VelocityX
  368. pjisuvParam.VelocityYOfCicvLocation = data.VelocityY
  369. pjisuvParam.VelocityYOfCicvLocation = data.VelocityZ
  370. pjisuvParam.YawOfCicvLocation = data.Yaw
  371. pjisuvParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
  372. pjisuvParam.PositionXOfCicvLocation = data.PositionX
  373. pjisuvParam.PositionYOfCicvLocation = data.PositionY
  374. }
  375. mutexOfCicvLocation.RUnlock()
  376. subscribersTimeMutexes[i].Lock()
  377. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  378. subscribersMutexes[i].Lock()
  379. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  380. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  381. var faultLabel string
  382. for _, f := range masterConfig.RuleOfCicvLocation {
  383. faultLabel = f(data)
  384. if faultLabel != "" {
  385. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  386. break
  387. }
  388. }
  389. subscribersMutexes[i].Unlock()
  390. }
  391. subscribersTimeMutexes[i].Unlock()
  392. },
  393. })
  394. }
  395. // 13
  396. if topic == masterConfig.TopicOfCloudClusters && len(masterConfig.RuleOfCloudClusters) > 0 {
  397. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  398. Node: commonConfig.RosNode,
  399. Topic: topic,
  400. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  401. subscribersTimeMutexes[i].Lock()
  402. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  403. subscribersMutexes[i].Lock()
  404. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  405. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  406. var faultLabel string
  407. for _, f := range masterConfig.RuleOfCloudClusters {
  408. faultLabel = f(data)
  409. if faultLabel != "" {
  410. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  411. subscribersTimes[i] = time.Now()
  412. break
  413. }
  414. }
  415. subscribersMutexes[i].Unlock()
  416. }
  417. subscribersTimeMutexes[i].Unlock()
  418. },
  419. })
  420. }
  421. // 14
  422. if topic == masterConfig.TopicOfHeartbeatInfo && len(masterConfig.RuleOfHeartbeatInfo) > 0 {
  423. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  424. Node: commonConfig.RosNode,
  425. Topic: topic,
  426. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  427. subscribersTimeMutexes[i].Lock()
  428. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  429. subscribersMutexes[i].Lock()
  430. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  431. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  432. var faultLabel string
  433. for _, f := range masterConfig.RuleOfHeartbeatInfo {
  434. faultLabel = f(data)
  435. if faultLabel != "" {
  436. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  437. subscribersTimes[i] = time.Now()
  438. break
  439. }
  440. }
  441. subscribersMutexes[i].Unlock()
  442. }
  443. subscribersTimeMutexes[i].Unlock()
  444. },
  445. })
  446. }
  447. // 15
  448. if topic == masterConfig.TopicOfLidarPretreatmentCost && len(masterConfig.RuleOfLidarPretreatmentCost) > 0 {
  449. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  450. Node: commonConfig.RosNode,
  451. Topic: topic,
  452. Callback: func(data *geometry_msgs.Vector3Stamped) {
  453. subscribersTimeMutexes[i].Lock()
  454. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  455. subscribersMutexes[i].Lock()
  456. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  457. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  458. var faultLabel string
  459. for _, f := range masterConfig.RuleOfLidarPretreatmentCost {
  460. faultLabel = f(data)
  461. if faultLabel != "" {
  462. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  463. subscribersTimes[i] = time.Now()
  464. break
  465. }
  466. }
  467. subscribersMutexes[i].Unlock()
  468. }
  469. subscribersTimeMutexes[i].Unlock()
  470. },
  471. })
  472. }
  473. // 16
  474. if topic == masterConfig.TopicOfLidarPretreatmentOdometry && len(masterConfig.RuleOfLidarPretreatmentOdometry) > 0 {
  475. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  476. Node: commonConfig.RosNode,
  477. Topic: topic,
  478. Callback: func(data *nav_msgs.Odometry) {
  479. subscribersTimeMutexes[i].Lock()
  480. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  481. subscribersMutexes[i].Lock()
  482. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  483. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  484. var faultLabel string
  485. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry {
  486. faultLabel = f(data)
  487. if faultLabel != "" {
  488. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  489. subscribersTimes[i] = time.Now()
  490. break
  491. }
  492. }
  493. subscribersMutexes[i].Unlock()
  494. }
  495. subscribersTimeMutexes[i].Unlock()
  496. },
  497. })
  498. }
  499. // 17
  500. if topic == masterConfig.TopicOfLidarRoi && len(masterConfig.RuleOfLidarRoi) > 0 {
  501. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  502. Node: commonConfig.RosNode,
  503. Topic: topic,
  504. Callback: func(data *geometry_msgs.PolygonStamped) {
  505. subscribersTimeMutexes[i].Lock()
  506. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  507. subscribersMutexes[i].Lock()
  508. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  509. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  510. var faultLabel string
  511. for _, f := range masterConfig.RuleOfLidarRoi {
  512. faultLabel = f(data)
  513. if faultLabel != "" {
  514. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  515. subscribersTimes[i] = time.Now()
  516. break
  517. }
  518. }
  519. subscribersMutexes[i].Unlock()
  520. }
  521. subscribersTimeMutexes[i].Unlock()
  522. },
  523. })
  524. }
  525. // 18
  526. if topic == masterConfig.TopicOfLine1 && len(masterConfig.RuleOfLine1) > 0 {
  527. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  528. Node: commonConfig.RosNode,
  529. Topic: topic,
  530. Callback: func(data *nav_msgs.Path) {
  531. subscribersTimeMutexes[i].Lock()
  532. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  533. subscribersMutexes[i].Lock()
  534. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  535. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  536. var faultLabel string
  537. for _, f := range masterConfig.RuleOfLine1 {
  538. faultLabel = f(data)
  539. if faultLabel != "" {
  540. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  541. subscribersTimes[i] = time.Now()
  542. break
  543. }
  544. }
  545. subscribersMutexes[i].Unlock()
  546. }
  547. subscribersTimeMutexes[i].Unlock()
  548. },
  549. })
  550. }
  551. // 19
  552. if topic == masterConfig.TopicOfLine2 && len(masterConfig.RuleOfLine2) > 0 {
  553. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  554. Node: commonConfig.RosNode,
  555. Topic: topic,
  556. Callback: func(data *nav_msgs.Path) {
  557. subscribersTimeMutexes[i].Lock()
  558. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  559. subscribersMutexes[i].Lock()
  560. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  561. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  562. var faultLabel string
  563. for _, f := range masterConfig.RuleOfLine2 {
  564. faultLabel = f(data)
  565. if faultLabel != "" {
  566. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  567. subscribersTimes[i] = time.Now()
  568. break
  569. }
  570. }
  571. subscribersMutexes[i].Unlock()
  572. }
  573. subscribersTimeMutexes[i].Unlock()
  574. },
  575. })
  576. }
  577. // 20
  578. if topic == masterConfig.TopicOfMapPolygon && len(masterConfig.RuleOfMapPolygon) > 0 {
  579. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  580. Node: commonConfig.RosNode,
  581. Topic: topic,
  582. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  583. subscribersTimeMutexes[i].Lock()
  584. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  585. subscribersMutexes[i].Lock()
  586. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  587. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  588. var faultLabel string
  589. for _, f := range masterConfig.RuleOfMapPolygon {
  590. faultLabel = f(data)
  591. if faultLabel != "" {
  592. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  593. subscribersTimes[i] = time.Now()
  594. break
  595. }
  596. }
  597. subscribersMutexes[i].Unlock()
  598. }
  599. subscribersTimeMutexes[i].Unlock()
  600. },
  601. })
  602. }
  603. // 21
  604. if topic == masterConfig.TopicOfObstacleDisplay && len(masterConfig.RuleOfObstacleDisplay) > 0 {
  605. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  606. Node: commonConfig.RosNode,
  607. Topic: topic,
  608. Callback: func(data *visualization_msgs.MarkerArray) {
  609. subscribersTimeMutexes[i].Lock()
  610. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  611. subscribersMutexes[i].Lock()
  612. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  613. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  614. var faultLabel string
  615. for _, f := range masterConfig.RuleOfObstacleDisplay {
  616. faultLabel = f(data)
  617. if faultLabel != "" {
  618. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  619. subscribersTimes[i] = time.Now()
  620. break
  621. }
  622. }
  623. subscribersMutexes[i].Unlock()
  624. }
  625. subscribersTimeMutexes[i].Unlock()
  626. },
  627. })
  628. }
  629. // 22
  630. if topic == masterConfig.TopicOfPjControlPub && len(masterConfig.RuleOfPjControlPub) > 0 {
  631. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  632. Node: commonConfig.RosNode,
  633. Topic: topic,
  634. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  635. // 更新共享变量
  636. mutexOfPjControlPub.RLock()
  637. {
  638. pjisuvParam.NumCountPjiControlCommandOfPjControlPub++
  639. if pjisuvParam.NumCountPjiControlCommandOfPjControlPub == 10 {
  640. pjisuvParam.EgoSteeringCmdOfPjControlPub = append(pjisuvParam.EgoSteeringCmdOfPjControlPub, data.ICPVCmdStrAngle)
  641. pjisuvParam.EgoThrottleCmdOfPjControlPub = append(pjisuvParam.EgoThrottleCmdOfPjControlPub, data.ICPVCmdAccPelPosAct)
  642. pjisuvParam.NumCountPjiControlCommandOfPjControlPub = 0
  643. }
  644. }
  645. mutexOfPjControlPub.RUnlock()
  646. subscribersTimeMutexes[i].Lock()
  647. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  648. subscribersMutexes[i].Lock()
  649. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  650. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  651. var faultLabel string
  652. for _, f := range masterConfig.RuleOfPjControlPub {
  653. faultLabel = f(data)
  654. if faultLabel != "" {
  655. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  656. subscribersTimes[i] = time.Now()
  657. break
  658. }
  659. }
  660. subscribersMutexes[i].Unlock()
  661. }
  662. subscribersTimeMutexes[i].Unlock()
  663. },
  664. })
  665. }
  666. // 23
  667. if topic == masterConfig.TopicOfPointsCluster && len(masterConfig.RuleOfPointsCluster) > 0 {
  668. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  669. Node: commonConfig.RosNode,
  670. Topic: topic,
  671. Callback: func(data *sensor_msgs.PointCloud2) {
  672. subscribersTimeMutexes[i].Lock()
  673. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  674. subscribersMutexes[i].Lock()
  675. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  676. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  677. var faultLabel string
  678. for _, f := range masterConfig.RuleOfPointsCluster {
  679. faultLabel = f(data)
  680. if faultLabel != "" {
  681. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  682. subscribersTimes[i] = time.Now()
  683. break
  684. }
  685. }
  686. subscribersMutexes[i].Unlock()
  687. }
  688. subscribersTimeMutexes[i].Unlock()
  689. },
  690. })
  691. }
  692. // 24
  693. if topic == masterConfig.TopicOfPointsConcat && len(masterConfig.RuleOfPointsConcat) > 0 {
  694. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  695. Node: commonConfig.RosNode,
  696. Topic: topic,
  697. Callback: func(data *sensor_msgs.PointCloud2) {
  698. subscribersTimeMutexes[i].Lock()
  699. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  700. subscribersMutexes[i].Lock()
  701. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  702. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  703. var faultLabel string
  704. for _, f := range masterConfig.RuleOfPointsConcat {
  705. faultLabel = f(data)
  706. if faultLabel != "" {
  707. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  708. subscribersTimes[i] = time.Now()
  709. break
  710. }
  711. }
  712. subscribersMutexes[i].Unlock()
  713. }
  714. subscribersTimeMutexes[i].Unlock()
  715. },
  716. })
  717. }
  718. // 25
  719. if topic == masterConfig.TopicOfReferenceDisplay && len(masterConfig.RuleOfReferenceDisplay) > 0 {
  720. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  721. Node: commonConfig.RosNode,
  722. Topic: topic,
  723. Callback: func(data *nav_msgs.Path) {
  724. subscribersTimeMutexes[i].Lock()
  725. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  726. subscribersMutexes[i].Lock()
  727. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  728. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  729. var faultLabel string
  730. for _, f := range masterConfig.RuleOfReferenceDisplay {
  731. faultLabel = f(data)
  732. if faultLabel != "" {
  733. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  734. subscribersTimes[i] = time.Now()
  735. break
  736. }
  737. }
  738. subscribersMutexes[i].Unlock()
  739. }
  740. subscribersTimeMutexes[i].Unlock()
  741. },
  742. })
  743. }
  744. // 26
  745. if topic == masterConfig.TopicOfReferenceTrajectory && len(masterConfig.RuleOfReferenceTrajectory) > 0 {
  746. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  747. Node: commonConfig.RosNode,
  748. Topic: topic,
  749. Callback: func(data *pjisuv_msgs.Trajectory) {
  750. subscribersTimeMutexes[i].Lock()
  751. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  752. subscribersMutexes[i].Lock()
  753. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  754. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  755. var faultLabel string
  756. for _, f := range masterConfig.RuleOfReferenceTrajectory {
  757. faultLabel = f(data)
  758. if faultLabel != "" {
  759. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  760. subscribersTimes[i] = time.Now()
  761. break
  762. }
  763. }
  764. subscribersMutexes[i].Unlock()
  765. }
  766. subscribersTimeMutexes[i].Unlock()
  767. },
  768. })
  769. }
  770. // 27
  771. if topic == masterConfig.TopicOfRoiPoints && len(masterConfig.RuleOfRoiPoints) > 0 {
  772. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  773. Node: commonConfig.RosNode,
  774. Topic: topic,
  775. Callback: func(data *sensor_msgs.PointCloud2) {
  776. subscribersTimeMutexes[i].Lock()
  777. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  778. subscribersMutexes[i].Lock()
  779. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  780. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  781. var faultLabel string
  782. for _, f := range masterConfig.RuleOfRoiPoints {
  783. faultLabel = f(data)
  784. if faultLabel != "" {
  785. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  786. subscribersTimes[i] = time.Now()
  787. break
  788. }
  789. }
  790. subscribersMutexes[i].Unlock()
  791. }
  792. subscribersTimeMutexes[i].Unlock()
  793. },
  794. })
  795. }
  796. // 28
  797. if topic == masterConfig.TopicOfRoiPolygon && len(masterConfig.RuleOfRoiPolygon) > 0 {
  798. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  799. Node: commonConfig.RosNode,
  800. Topic: topic,
  801. Callback: func(data *nav_msgs.Path) {
  802. subscribersTimeMutexes[i].Lock()
  803. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  804. subscribersMutexes[i].Lock()
  805. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  806. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  807. var faultLabel string
  808. for _, f := range masterConfig.RuleOfRoiPolygon {
  809. faultLabel = f(data)
  810. if faultLabel != "" {
  811. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  812. subscribersTimes[i] = time.Now()
  813. break
  814. }
  815. }
  816. subscribersMutexes[i].Unlock()
  817. }
  818. subscribersTimeMutexes[i].Unlock()
  819. },
  820. })
  821. }
  822. // 29
  823. if topic == masterConfig.TopicOfTf && len(masterConfig.RuleOfTf) > 0 {
  824. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  825. Node: commonConfig.RosNode,
  826. Topic: topic,
  827. Callback: func(data *tf2_msgs.TFMessage) {
  828. subscribersTimeMutexes[i].Lock()
  829. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  830. subscribersMutexes[i].Lock()
  831. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  832. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  833. var faultLabel string
  834. for _, f := range masterConfig.RuleOfTf {
  835. faultLabel = f(data)
  836. if faultLabel != "" {
  837. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  838. subscribersTimes[i] = time.Now()
  839. break
  840. }
  841. }
  842. subscribersMutexes[i].Unlock()
  843. }
  844. subscribersTimeMutexes[i].Unlock()
  845. },
  846. })
  847. }
  848. // 30
  849. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  850. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  851. Node: commonConfig.RosNode,
  852. Topic: topic,
  853. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  854. // 更新共享变量
  855. mutexOfTpperception.RLock()
  856. {
  857. for _, obj := range data.Objs {
  858. if obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  859. continue
  860. }
  861. // 检查 ObjDicOfTpperception 是否为 nil,如果是,则初始化它
  862. if pjisuvParam.ObjDicOfTpperception == nil {
  863. pjisuvParam.ObjDicOfTpperception = make(map[uint32][]float32)
  864. }
  865. if _, ok := pjisuvParam.ObjDicOfTpperception[obj.Id]; !ok {
  866. pjisuvParam.ObjDicOfTpperception[obj.Id] = []float32{}
  867. }
  868. pjisuvParam.ObjDicOfTpperception[obj.Id] = append(pjisuvParam.ObjDicOfTpperception[obj.Id], obj.Y)
  869. }
  870. }
  871. mutexOfTpperception.RUnlock()
  872. subscribersTimeMutexes[i].Lock()
  873. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  874. subscribersMutexes[i].Lock()
  875. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  876. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  877. var faultLabel string
  878. for _, f := range masterConfig.RuleOfTpperception {
  879. faultLabel = f(data, pjisuvParam)
  880. if faultLabel != "" {
  881. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  882. break
  883. }
  884. }
  885. subscribersMutexes[i].Unlock()
  886. }
  887. subscribersTimeMutexes[i].Unlock()
  888. // -------- 触发后更新共享变量
  889. mutexOfTpperception.RLock()
  890. {
  891. for _, obj := range data.Objs {
  892. pjisuvParam.ObjTypeDicOfTpperception[obj.Id] = obj.Type
  893. pjisuvParam.ObjSpeedDicOfTpperception[obj.Id] = math.Pow(math.Pow(float64(obj.Vxabs), 2)+math.Pow(float64(obj.Vyabs), 2), 0.5)
  894. }
  895. }
  896. mutexOfTpperception.RUnlock()
  897. },
  898. })
  899. }
  900. // 31
  901. if topic == masterConfig.TopicOfTpperceptionVis && len(masterConfig.RuleOfTpperceptionVis) > 0 {
  902. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  903. Node: commonConfig.RosNode,
  904. Topic: topic,
  905. Callback: func(data *visualization_msgs.MarkerArray) {
  906. subscribersTimeMutexes[i].Lock()
  907. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  908. subscribersMutexes[i].Lock()
  909. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  910. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  911. var faultLabel string
  912. for _, f := range masterConfig.RuleOfTpperceptionVis {
  913. faultLabel = f(data)
  914. if faultLabel != "" {
  915. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  916. subscribersTimes[i] = time.Now()
  917. break
  918. }
  919. }
  920. subscribersMutexes[i].Unlock()
  921. }
  922. subscribersTimeMutexes[i].Unlock()
  923. },
  924. })
  925. }
  926. // 32
  927. if topic == masterConfig.TopicOfTprouteplan && len(masterConfig.RuleOfTprouteplan) > 0 {
  928. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  929. Node: commonConfig.RosNode,
  930. Topic: topic,
  931. Callback: func(data *pjisuv_msgs.RoutePlan) {
  932. subscribersTimeMutexes[i].Lock()
  933. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  934. subscribersMutexes[i].Lock()
  935. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  936. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  937. var faultLabel string
  938. for _, f := range masterConfig.RuleOfTprouteplan {
  939. faultLabel = f(data)
  940. if faultLabel != "" {
  941. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  942. subscribersTimes[i] = time.Now()
  943. break
  944. }
  945. }
  946. subscribersMutexes[i].Unlock()
  947. }
  948. subscribersTimeMutexes[i].Unlock()
  949. },
  950. })
  951. }
  952. // 33
  953. if topic == masterConfig.TopicOfTrajectoryDisplay && len(masterConfig.RuleOfTrajectoryDisplay) > 0 {
  954. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  955. Node: commonConfig.RosNode,
  956. Topic: topic,
  957. Callback: func(data *nav_msgs.Path) {
  958. subscribersTimeMutexes[i].Lock()
  959. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  960. subscribersMutexes[i].Lock()
  961. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  962. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  963. var faultLabel string
  964. for _, f := range masterConfig.RuleOfTrajectoryDisplay {
  965. faultLabel = f(data)
  966. if faultLabel != "" {
  967. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  968. subscribersTimes[i] = time.Now()
  969. break
  970. }
  971. }
  972. subscribersMutexes[i].Unlock()
  973. }
  974. subscribersTimeMutexes[i].Unlock()
  975. },
  976. })
  977. }
  978. // 34
  979. if topic == masterConfig.TopicOfUngroundCloudpoints && len(masterConfig.RuleOfUngroundCloudpoints) > 0 {
  980. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  981. Node: commonConfig.RosNode,
  982. Topic: topic,
  983. Callback: func(data *sensor_msgs.PointCloud2) {
  984. subscribersTimeMutexes[i].Lock()
  985. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  986. subscribersMutexes[i].Lock()
  987. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  988. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  989. var faultLabel string
  990. for _, f := range masterConfig.RuleOfUngroundCloudpoints {
  991. faultLabel = f(data)
  992. if faultLabel != "" {
  993. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  994. subscribersTimes[i] = time.Now()
  995. break
  996. }
  997. }
  998. subscribersMutexes[i].Unlock()
  999. }
  1000. subscribersTimeMutexes[i].Unlock()
  1001. },
  1002. })
  1003. }
  1004. // 35
  1005. if topic == masterConfig.TopicOfCameraImage && len(masterConfig.RuleOfCameraImage) > 0 {
  1006. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1007. Node: commonConfig.RosNode,
  1008. Topic: topic,
  1009. Callback: func(data *sensor_msgs.Image) {
  1010. subscribersTimeMutexes[i].Lock()
  1011. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1012. subscribersMutexes[i].Lock()
  1013. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1014. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1015. var faultLabel string
  1016. for _, f := range masterConfig.RuleOfCameraImage {
  1017. faultLabel = f(data)
  1018. if faultLabel != "" {
  1019. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1020. subscribersTimes[i] = time.Now()
  1021. break
  1022. }
  1023. }
  1024. subscribersMutexes[i].Unlock()
  1025. }
  1026. subscribersTimeMutexes[i].Unlock()
  1027. },
  1028. })
  1029. }
  1030. // 36
  1031. if topic == masterConfig.TopicOfDataRead && len(masterConfig.RuleOfDataRead) > 0 {
  1032. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1033. Node: commonConfig.RosNode,
  1034. Topic: topic,
  1035. Callback: func(data *pjisuv_msgs.Retrieval) {
  1036. // 更新共享变量
  1037. mutexOfDataRead.RLock()
  1038. {
  1039. pjisuvParam.NumCountDataReadOfDataRead++
  1040. if pjisuvParam.NumCountDataReadOfDataRead == 10 {
  1041. pjisuvParam.EgoSteeringRealOfDataRead = append(pjisuvParam.EgoSteeringRealOfDataRead, data.ActStrWhAng)
  1042. pjisuvParam.EgoThrottleRealOfDataRead = append(pjisuvParam.EgoThrottleRealOfDataRead, data.AccPed2)
  1043. pjisuvParam.NumCountDataReadOfDataRead = 0
  1044. }
  1045. pjisuvParam.StrgAngleRealValueOfDataRead = data.ActStrWhAng
  1046. }
  1047. mutexOfDataRead.RUnlock()
  1048. subscribersTimeMutexes[i].Lock()
  1049. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1050. subscribersMutexes[i].Lock()
  1051. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1052. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1053. var faultLabel string
  1054. for _, f := range masterConfig.RuleOfDataRead {
  1055. faultLabel = f(data)
  1056. if faultLabel != "" {
  1057. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1058. subscribersTimes[i] = time.Now()
  1059. break
  1060. }
  1061. }
  1062. subscribersMutexes[i].Unlock()
  1063. }
  1064. subscribersTimeMutexes[i].Unlock()
  1065. },
  1066. })
  1067. }
  1068. // 37
  1069. if topic == masterConfig.TopicOfPjiGps && len(masterConfig.RuleOfPjiGps) > 0 {
  1070. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1071. Node: commonConfig.RosNode,
  1072. Topic: topic,
  1073. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  1074. subscribersTimeMutexes[i].Lock()
  1075. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1076. subscribersMutexes[i].Lock()
  1077. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1078. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1079. var faultLabel string
  1080. for _, f := range masterConfig.RuleOfPjiGps {
  1081. faultLabel = f(data)
  1082. if faultLabel != "" {
  1083. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1084. subscribersTimes[i] = time.Now()
  1085. break
  1086. }
  1087. }
  1088. subscribersMutexes[i].Unlock()
  1089. }
  1090. subscribersTimeMutexes[i].Unlock()
  1091. },
  1092. })
  1093. }
  1094. // 39
  1095. if topic == masterConfig.TopicOfPjVehicleFdbPub && len(masterConfig.RuleOfPjVehicleFdbPub) > 0 {
  1096. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1097. Node: commonConfig.RosNode,
  1098. Topic: topic,
  1099. Callback: func(data *pjisuv_msgs.VehicleFdb) {
  1100. subscribersTimeMutexes[i].Lock()
  1101. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1102. subscribersMutexes[i].Lock()
  1103. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1104. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1105. var faultLabel string
  1106. for _, f := range masterConfig.RuleOfPjVehicleFdbPub {
  1107. faultLabel = f(data, &pjisuvParam)
  1108. if faultLabel != "" {
  1109. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1110. subscribersTimes[i] = time.Now()
  1111. break
  1112. }
  1113. }
  1114. subscribersMutexes[i].Unlock()
  1115. }
  1116. subscribersTimeMutexes[i].Unlock()
  1117. },
  1118. })
  1119. }
  1120. if err != nil {
  1121. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  1122. //TODO 如何回传日志
  1123. continue
  1124. }
  1125. }
  1126. select {
  1127. case signal := <-service.ChannelKillWindowProducer:
  1128. if signal == 1 {
  1129. commonConfig.RosNode.Close()
  1130. service.AddKillTimes("3")
  1131. return
  1132. }
  1133. }
  1134. }
  1135. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  1136. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1137. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  1138. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  1139. newTimeWindow := commonEntity.TimeWindow{
  1140. FaultTime: faultHappenTime,
  1141. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  1142. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  1143. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  1144. Labels: []string{faultLabel},
  1145. MasterTopics: masterTopics,
  1146. SlaveTopics: slaveTopics,
  1147. }
  1148. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1149. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1150. } else {
  1151. // 2-2 如果在旧故障窗口内
  1152. commonEntity.TimeWindowProducerQueueMutex.RLock()
  1153. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  1154. // 2-2-1 更新故障窗口end时间
  1155. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  1156. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1157. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  1158. lastTimeWindow.TimeWindowEnd = maxEnd
  1159. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  1160. } else {
  1161. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  1162. lastTimeWindow.TimeWindowEnd = expectEnd
  1163. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1164. }
  1165. }
  1166. // 2-2-2 更新label
  1167. labels := lastTimeWindow.Labels
  1168. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1169. // 2-2-3 更新 topic
  1170. sourceMasterTopics := lastTimeWindow.MasterTopics
  1171. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1172. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1173. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1174. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1175. }
  1176. }
  1177. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1178. // 获取所有需要采集的topic
  1179. var faultCodeTopics []string
  1180. for _, code := range commonConfig.CloudConfig.Triggers {
  1181. if code.Label == faultLabel {
  1182. faultCodeTopics = code.Topics
  1183. }
  1184. }
  1185. // 根据不同节点采集的topic进行分配采集
  1186. for _, acceptTopic := range faultCodeTopics {
  1187. for _, host := range commonConfig.CloudConfig.Hosts {
  1188. for _, topic := range host.Topics {
  1189. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1190. masterTopics = append(masterTopics, acceptTopic)
  1191. }
  1192. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1193. slaveTopics = append(slaveTopics, acceptTopic)
  1194. }
  1195. }
  1196. }
  1197. }
  1198. return masterTopics, slaveTopics
  1199. }