produce_window.go 45 KB


  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "cicv-data-closedloop/pjisuv_param"
  11. "github.com/bluenviron/goroslib/v2"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  16. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  17. "math"
  18. "sync"
  19. "time"
  20. )
  21. // 所有共享变量
  22. var (
  23. pjisuvParam pjisuv_param.PjisuvParam
  24. // /cicv_location
  25. mutexOfCicvLocation sync.RWMutex
  26. // /tpperception
  27. mutexOfTpperception sync.RWMutex
  28. // /pj_control_pub
  29. mutexOfPjControlPub sync.RWMutex
  30. // /data_read
  31. mutexOfDataRead sync.RWMutex
  32. // /pj_vehicle_fdb_pub
  33. mutexOfPjVehicleFdbPub sync.RWMutex
  34. // /pj_vehicle_fdb_pub
  35. mutexOfCicvAmrTrajectory sync.RWMutex
  36. )
  37. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  38. func PrepareTimeWindowProducerQueue() {
  39. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  40. var err error
  41. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  42. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  43. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  44. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  45. for i, topic := range commonConfig.SubscribeTopics {
  46. // 增加了可扩展性
  47. if topic == masterConfig.TopicOfCicvExtend {
  48. go func() {
  49. for {
  50. time.Sleep(time.Duration(3500) * time.Millisecond)
  51. for _, f := range masterConfig.RuleOfCicvExtend {
  52. label := f(pjisuvParam)
  53. if label != "" {
  54. saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
  55. break
  56. }
  57. }
  58. }
  59. }()
  60. }
  61. // 其他常规监听器
  62. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  63. // 1
  64. if topic == masterConfig.TopicOfAmrPose && len(masterConfig.RuleOfAmrPose) > 0 {
  65. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  66. Node: commonConfig.RosNode,
  67. Topic: topic,
  68. Callback: func(data *visualization_msgs.MarkerArray) {
  69. subscribersTimeMutexes[i].Lock()
  70. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  71. subscribersMutexes[i].Lock()
  72. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  73. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  74. var faultLabel string
  75. for _, f := range masterConfig.RuleOfAmrPose {
  76. faultLabel = f(data)
  77. if faultLabel != "" {
  78. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  79. subscribersTimes[i] = time.Now()
  80. break
  81. }
  82. }
  83. subscribersMutexes[i].Unlock()
  84. }
  85. subscribersTimeMutexes[i].Unlock()
  86. },
  87. })
  88. }
  89. // 2
  90. if topic == masterConfig.TopicOfBoundingBoxesFast && len(masterConfig.RuleOfBoundingBoxesFast) > 0 {
  91. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  92. Node: commonConfig.RosNode,
  93. Topic: topic,
  94. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  95. subscribersTimeMutexes[i].Lock()
  96. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  97. subscribersMutexes[i].Lock()
  98. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  99. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  100. var faultLabel string
  101. for _, f := range masterConfig.RuleOfBoundingBoxesFast {
  102. faultLabel = f(data)
  103. if faultLabel != "" {
  104. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  105. subscribersTimes[i] = time.Now()
  106. break
  107. }
  108. }
  109. subscribersMutexes[i].Unlock()
  110. }
  111. subscribersTimeMutexes[i].Unlock()
  112. },
  113. })
  114. }
  115. // 3
  116. if topic == masterConfig.TopicOfCameraFault && len(masterConfig.RuleOfCameraFault) > 0 {
  117. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  118. Node: commonConfig.RosNode,
  119. Topic: topic,
  120. Callback: func(data *pjisuv_msgs.FaultVec) {
  121. subscribersTimeMutexes[i].Lock()
  122. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  123. subscribersMutexes[i].Lock()
  124. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  125. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  126. var faultLabel string
  127. for _, f := range masterConfig.RuleOfCameraFault {
  128. faultLabel = f(data)
  129. if faultLabel != "" {
  130. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  131. subscribersTimes[i] = time.Now()
  132. break
  133. }
  134. }
  135. subscribersMutexes[i].Unlock()
  136. }
  137. subscribersTimeMutexes[i].Unlock()
  138. },
  139. })
  140. }
  141. // 4
  142. if topic == masterConfig.TopicOfCanData && len(masterConfig.RuleOfCanData) > 0 {
  143. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  144. Node: commonConfig.RosNode,
  145. Topic: topic,
  146. Callback: func(data *pjisuv_msgs.Frame) {
  147. subscribersTimeMutexes[i].Lock()
  148. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  149. subscribersMutexes[i].Lock()
  150. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  151. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  152. var faultLabel string
  153. for _, f := range masterConfig.RuleOfCanData {
  154. faultLabel = f(data)
  155. if faultLabel != "" {
  156. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  157. subscribersTimes[i] = time.Now()
  158. break
  159. }
  160. }
  161. subscribersMutexes[i].Unlock()
  162. }
  163. subscribersTimeMutexes[i].Unlock()
  164. },
  165. })
  166. }
  167. // 5
  168. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud && len(masterConfig.RuleOfCh128x1LslidarPointCloud) > 0 {
  169. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  170. Node: commonConfig.RosNode,
  171. Topic: topic,
  172. Callback: func(data *sensor_msgs.PointCloud2) {
  173. subscribersTimeMutexes[i].Lock()
  174. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  175. subscribersMutexes[i].Lock()
  176. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  177. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  178. var faultLabel string
  179. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud {
  180. faultLabel = f(data)
  181. if faultLabel != "" {
  182. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  183. subscribersTimes[i] = time.Now()
  184. break
  185. }
  186. }
  187. subscribersMutexes[i].Unlock()
  188. }
  189. subscribersTimeMutexes[i].Unlock()
  190. },
  191. })
  192. }
  193. // 6
  194. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud && len(masterConfig.RuleOfCh64wLLslidarPointCloud) > 0 {
  195. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  196. Node: commonConfig.RosNode,
  197. Topic: topic,
  198. Callback: func(data *sensor_msgs.PointCloud2) {
  199. subscribersTimeMutexes[i].Lock()
  200. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  201. subscribersMutexes[i].Lock()
  202. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  203. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  204. var faultLabel string
  205. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud {
  206. faultLabel = f(data)
  207. if faultLabel != "" {
  208. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  209. subscribersTimes[i] = time.Now()
  210. break
  211. }
  212. }
  213. subscribersMutexes[i].Unlock()
  214. }
  215. subscribersTimeMutexes[i].Unlock()
  216. },
  217. })
  218. }
  219. // 7
  220. if topic == masterConfig.TopicOfCh64wLScan && len(masterConfig.RuleOfCh64wLScan) > 0 {
  221. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  222. Node: commonConfig.RosNode,
  223. Topic: topic,
  224. Callback: func(data *sensor_msgs.LaserScan) {
  225. subscribersTimeMutexes[i].Lock()
  226. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  227. subscribersMutexes[i].Lock()
  228. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  229. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  230. var faultLabel string
  231. for _, f := range masterConfig.RuleOfCh64wLScan {
  232. faultLabel = f(data)
  233. if faultLabel != "" {
  234. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  235. subscribersTimes[i] = time.Now()
  236. break
  237. }
  238. }
  239. subscribersMutexes[i].Unlock()
  240. }
  241. subscribersTimeMutexes[i].Unlock()
  242. },
  243. })
  244. }
  245. // 8
  246. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud && len(masterConfig.RuleOfCh64wRLslidarPointCloud) > 0 {
  247. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  248. Node: commonConfig.RosNode,
  249. Topic: topic,
  250. Callback: func(data *sensor_msgs.PointCloud2) {
  251. subscribersTimeMutexes[i].Lock()
  252. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  253. subscribersMutexes[i].Lock()
  254. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  255. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  256. var faultLabel string
  257. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud {
  258. faultLabel = f(data)
  259. if faultLabel != "" {
  260. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  261. subscribersTimes[i] = time.Now()
  262. break
  263. }
  264. }
  265. subscribersMutexes[i].Unlock()
  266. }
  267. subscribersTimeMutexes[i].Unlock()
  268. },
  269. })
  270. }
  271. // 9
  272. if topic == masterConfig.TopicOfCh64wRScan && len(masterConfig.RuleOfCh64wRScan) > 0 {
  273. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  274. Node: commonConfig.RosNode,
  275. Topic: topic,
  276. Callback: func(data *sensor_msgs.LaserScan) {
  277. subscribersTimeMutexes[i].Lock()
  278. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  279. subscribersMutexes[i].Lock()
  280. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  281. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  282. var faultLabel string
  283. for _, f := range masterConfig.RuleOfCh64wRScan {
  284. faultLabel = f(data)
  285. if faultLabel != "" {
  286. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  287. subscribersTimes[i] = time.Now()
  288. break
  289. }
  290. }
  291. subscribersMutexes[i].Unlock()
  292. }
  293. subscribersTimeMutexes[i].Unlock()
  294. },
  295. })
  296. }
  297. // 10
  298. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects && len(masterConfig.RuleOfCicvLidarclusterMovingObjects) > 0 {
  299. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  300. Node: commonConfig.RosNode,
  301. Topic: topic,
  302. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  303. subscribersTimeMutexes[i].Lock()
  304. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  305. subscribersMutexes[i].Lock()
  306. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  307. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  308. var faultLabel string
  309. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects {
  310. faultLabel = f(data)
  311. if faultLabel != "" {
  312. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  313. subscribersTimes[i] = time.Now()
  314. break
  315. }
  316. }
  317. subscribersMutexes[i].Unlock()
  318. }
  319. subscribersTimeMutexes[i].Unlock()
  320. },
  321. })
  322. }
  323. // 11
  324. if topic == masterConfig.TopicOfCicvAmrTrajectory && len(masterConfig.RuleOfCicvAmrTrajectory) > 0 {
  325. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  326. Node: commonConfig.RosNode,
  327. Topic: topic,
  328. Callback: func(data *pjisuv_msgs.Trajectory) {
  329. subscribersTimeMutexes[i].Lock()
  330. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  331. subscribersMutexes[i].Lock()
  332. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  333. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  334. var faultLabel string
  335. for _, f := range masterConfig.RuleOfCicvAmrTrajectory {
  336. faultLabel = f(data, pjisuvParam)
  337. if faultLabel != "" {
  338. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  339. subscribersTimes[i] = time.Now()
  340. break
  341. }
  342. }
  343. subscribersMutexes[i].Unlock()
  344. }
  345. subscribersTimeMutexes[i].Unlock()
  346. // 触发后更新共享变量
  347. mutexOfCicvAmrTrajectory.RLock()
  348. {
  349. var currentCurvateres []float64
  350. for _, point := range data.Trajectoryinfo.Trajectorypoints {
  351. currentCurvateres = append(currentCurvateres, math.Abs(float64(point.Curvature)))
  352. }
  353. pjisuvParam.LastCurvaturesOfCicvAmrTrajectory = currentCurvateres
  354. }
  355. mutexOfCicvAmrTrajectory.RUnlock()
  356. },
  357. })
  358. }
  359. // 12
  360. if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
  361. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  362. Node: commonConfig.RosNode,
  363. Topic: topic,
  364. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  365. // 更新共享变量
  366. mutexOfCicvLocation.RLock()
  367. {
  368. pjisuvParam.VelocityYOfCicvLocation = data.VelocityX
  369. pjisuvParam.VelocityYOfCicvLocation = data.VelocityY
  370. pjisuvParam.VelocityYOfCicvLocation = data.VelocityZ
  371. pjisuvParam.YawOfCicvLocation = data.Yaw
  372. pjisuvParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
  373. pjisuvParam.PositionXOfCicvLocation = data.PositionX
  374. pjisuvParam.PositionYOfCicvLocation = data.PositionY
  375. }
  376. mutexOfCicvLocation.RUnlock()
  377. subscribersTimeMutexes[i].Lock()
  378. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  379. subscribersMutexes[i].Lock()
  380. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  381. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  382. var faultLabel string
  383. for _, f := range masterConfig.RuleOfCicvLocation {
  384. faultLabel = f(data)
  385. if faultLabel != "" {
  386. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  387. break
  388. }
  389. }
  390. subscribersMutexes[i].Unlock()
  391. }
  392. subscribersTimeMutexes[i].Unlock()
  393. },
  394. })
  395. }
  396. // 13
  397. if topic == masterConfig.TopicOfCloudClusters && len(masterConfig.RuleOfCloudClusters) > 0 {
  398. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  399. Node: commonConfig.RosNode,
  400. Topic: topic,
  401. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  402. subscribersTimeMutexes[i].Lock()
  403. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  404. subscribersMutexes[i].Lock()
  405. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  406. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  407. var faultLabel string
  408. for _, f := range masterConfig.RuleOfCloudClusters {
  409. faultLabel = f(data)
  410. if faultLabel != "" {
  411. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  412. subscribersTimes[i] = time.Now()
  413. break
  414. }
  415. }
  416. subscribersMutexes[i].Unlock()
  417. }
  418. subscribersTimeMutexes[i].Unlock()
  419. },
  420. })
  421. }
  422. // 14
  423. if topic == masterConfig.TopicOfHeartbeatInfo && len(masterConfig.RuleOfHeartbeatInfo) > 0 {
  424. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  425. Node: commonConfig.RosNode,
  426. Topic: topic,
  427. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  428. subscribersTimeMutexes[i].Lock()
  429. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  430. subscribersMutexes[i].Lock()
  431. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  432. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  433. var faultLabel string
  434. for _, f := range masterConfig.RuleOfHeartbeatInfo {
  435. faultLabel = f(data)
  436. if faultLabel != "" {
  437. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  438. subscribersTimes[i] = time.Now()
  439. break
  440. }
  441. }
  442. subscribersMutexes[i].Unlock()
  443. }
  444. subscribersTimeMutexes[i].Unlock()
  445. },
  446. })
  447. }
  448. // 15
  449. if topic == masterConfig.TopicOfLidarPretreatmentCost && len(masterConfig.RuleOfLidarPretreatmentCost) > 0 {
  450. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  451. Node: commonConfig.RosNode,
  452. Topic: topic,
  453. Callback: func(data *geometry_msgs.Vector3Stamped) {
  454. subscribersTimeMutexes[i].Lock()
  455. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  456. subscribersMutexes[i].Lock()
  457. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  458. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  459. var faultLabel string
  460. for _, f := range masterConfig.RuleOfLidarPretreatmentCost {
  461. faultLabel = f(data)
  462. if faultLabel != "" {
  463. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  464. subscribersTimes[i] = time.Now()
  465. break
  466. }
  467. }
  468. subscribersMutexes[i].Unlock()
  469. }
  470. subscribersTimeMutexes[i].Unlock()
  471. },
  472. })
  473. }
  474. // 16
  475. if topic == masterConfig.TopicOfLidarPretreatmentOdometry && len(masterConfig.RuleOfLidarPretreatmentOdometry) > 0 {
  476. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  477. Node: commonConfig.RosNode,
  478. Topic: topic,
  479. Callback: func(data *nav_msgs.Odometry) {
  480. subscribersTimeMutexes[i].Lock()
  481. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  482. subscribersMutexes[i].Lock()
  483. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  484. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  485. var faultLabel string
  486. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry {
  487. faultLabel = f(data)
  488. if faultLabel != "" {
  489. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  490. subscribersTimes[i] = time.Now()
  491. break
  492. }
  493. }
  494. subscribersMutexes[i].Unlock()
  495. }
  496. subscribersTimeMutexes[i].Unlock()
  497. },
  498. })
  499. }
  500. // 17
  501. if topic == masterConfig.TopicOfLidarRoi && len(masterConfig.RuleOfLidarRoi) > 0 {
  502. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  503. Node: commonConfig.RosNode,
  504. Topic: topic,
  505. Callback: func(data *geometry_msgs.PolygonStamped) {
  506. subscribersTimeMutexes[i].Lock()
  507. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  508. subscribersMutexes[i].Lock()
  509. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  510. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  511. var faultLabel string
  512. for _, f := range masterConfig.RuleOfLidarRoi {
  513. faultLabel = f(data)
  514. if faultLabel != "" {
  515. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  516. subscribersTimes[i] = time.Now()
  517. break
  518. }
  519. }
  520. subscribersMutexes[i].Unlock()
  521. }
  522. subscribersTimeMutexes[i].Unlock()
  523. },
  524. })
  525. }
  526. // 18
  527. if topic == masterConfig.TopicOfLine1 && len(masterConfig.RuleOfLine1) > 0 {
  528. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  529. Node: commonConfig.RosNode,
  530. Topic: topic,
  531. Callback: func(data *nav_msgs.Path) {
  532. subscribersTimeMutexes[i].Lock()
  533. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  534. subscribersMutexes[i].Lock()
  535. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  536. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  537. var faultLabel string
  538. for _, f := range masterConfig.RuleOfLine1 {
  539. faultLabel = f(data)
  540. if faultLabel != "" {
  541. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  542. subscribersTimes[i] = time.Now()
  543. break
  544. }
  545. }
  546. subscribersMutexes[i].Unlock()
  547. }
  548. subscribersTimeMutexes[i].Unlock()
  549. },
  550. })
  551. }
  552. // 19
  553. if topic == masterConfig.TopicOfLine2 && len(masterConfig.RuleOfLine2) > 0 {
  554. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  555. Node: commonConfig.RosNode,
  556. Topic: topic,
  557. Callback: func(data *nav_msgs.Path) {
  558. subscribersTimeMutexes[i].Lock()
  559. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  560. subscribersMutexes[i].Lock()
  561. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  562. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  563. var faultLabel string
  564. for _, f := range masterConfig.RuleOfLine2 {
  565. faultLabel = f(data)
  566. if faultLabel != "" {
  567. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  568. subscribersTimes[i] = time.Now()
  569. break
  570. }
  571. }
  572. subscribersMutexes[i].Unlock()
  573. }
  574. subscribersTimeMutexes[i].Unlock()
  575. },
  576. })
  577. }
  578. // 20
  579. if topic == masterConfig.TopicOfMapPolygon && len(masterConfig.RuleOfMapPolygon) > 0 {
  580. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  581. Node: commonConfig.RosNode,
  582. Topic: topic,
  583. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  584. subscribersTimeMutexes[i].Lock()
  585. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  586. subscribersMutexes[i].Lock()
  587. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  588. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  589. var faultLabel string
  590. for _, f := range masterConfig.RuleOfMapPolygon {
  591. faultLabel = f(data)
  592. if faultLabel != "" {
  593. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  594. subscribersTimes[i] = time.Now()
  595. break
  596. }
  597. }
  598. subscribersMutexes[i].Unlock()
  599. }
  600. subscribersTimeMutexes[i].Unlock()
  601. },
  602. })
  603. }
  604. // 21
  605. if topic == masterConfig.TopicOfObstacleDisplay && len(masterConfig.RuleOfObstacleDisplay) > 0 {
  606. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  607. Node: commonConfig.RosNode,
  608. Topic: topic,
  609. Callback: func(data *visualization_msgs.MarkerArray) {
  610. subscribersTimeMutexes[i].Lock()
  611. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  612. subscribersMutexes[i].Lock()
  613. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  614. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  615. var faultLabel string
  616. for _, f := range masterConfig.RuleOfObstacleDisplay {
  617. faultLabel = f(data)
  618. if faultLabel != "" {
  619. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  620. subscribersTimes[i] = time.Now()
  621. break
  622. }
  623. }
  624. subscribersMutexes[i].Unlock()
  625. }
  626. subscribersTimeMutexes[i].Unlock()
  627. },
  628. })
  629. }
  630. // 22
  631. if topic == masterConfig.TopicOfPjControlPub && len(masterConfig.RuleOfPjControlPub) > 0 {
  632. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  633. Node: commonConfig.RosNode,
  634. Topic: topic,
  635. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  636. // 更新共享变量
  637. mutexOfPjControlPub.RLock()
  638. {
  639. pjisuvParam.NumCountPjiControlCommandOfPjControlPub++
  640. if pjisuvParam.NumCountPjiControlCommandOfPjControlPub == 10 {
  641. pjisuvParam.EgoSteeringCmdOfPjControlPub = append(pjisuvParam.EgoSteeringCmdOfPjControlPub, data.ICPVCmdStrAngle)
  642. pjisuvParam.EgoThrottleCmdOfPjControlPub = append(pjisuvParam.EgoThrottleCmdOfPjControlPub, data.ICPVCmdAccPelPosAct)
  643. pjisuvParam.NumCountPjiControlCommandOfPjControlPub = 0
  644. }
  645. }
  646. mutexOfPjControlPub.RUnlock()
  647. subscribersTimeMutexes[i].Lock()
  648. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  649. subscribersMutexes[i].Lock()
  650. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  651. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  652. var faultLabel string
  653. for _, f := range masterConfig.RuleOfPjControlPub {
  654. faultLabel = f(data)
  655. if faultLabel != "" {
  656. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  657. subscribersTimes[i] = time.Now()
  658. break
  659. }
  660. }
  661. subscribersMutexes[i].Unlock()
  662. }
  663. subscribersTimeMutexes[i].Unlock()
  664. },
  665. })
  666. }
  667. // 23
  668. if topic == masterConfig.TopicOfPointsCluster && len(masterConfig.RuleOfPointsCluster) > 0 {
  669. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  670. Node: commonConfig.RosNode,
  671. Topic: topic,
  672. Callback: func(data *sensor_msgs.PointCloud2) {
  673. subscribersTimeMutexes[i].Lock()
  674. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  675. subscribersMutexes[i].Lock()
  676. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  677. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  678. var faultLabel string
  679. for _, f := range masterConfig.RuleOfPointsCluster {
  680. faultLabel = f(data)
  681. if faultLabel != "" {
  682. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  683. subscribersTimes[i] = time.Now()
  684. break
  685. }
  686. }
  687. subscribersMutexes[i].Unlock()
  688. }
  689. subscribersTimeMutexes[i].Unlock()
  690. },
  691. })
  692. }
  693. // 24
  694. if topic == masterConfig.TopicOfPointsConcat && len(masterConfig.RuleOfPointsConcat) > 0 {
  695. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  696. Node: commonConfig.RosNode,
  697. Topic: topic,
  698. Callback: func(data *sensor_msgs.PointCloud2) {
  699. subscribersTimeMutexes[i].Lock()
  700. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  701. subscribersMutexes[i].Lock()
  702. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  703. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  704. var faultLabel string
  705. for _, f := range masterConfig.RuleOfPointsConcat {
  706. faultLabel = f(data)
  707. if faultLabel != "" {
  708. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  709. subscribersTimes[i] = time.Now()
  710. break
  711. }
  712. }
  713. subscribersMutexes[i].Unlock()
  714. }
  715. subscribersTimeMutexes[i].Unlock()
  716. },
  717. })
  718. }
  719. // 25
  720. if topic == masterConfig.TopicOfReferenceDisplay && len(masterConfig.RuleOfReferenceDisplay) > 0 {
  721. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  722. Node: commonConfig.RosNode,
  723. Topic: topic,
  724. Callback: func(data *nav_msgs.Path) {
  725. subscribersTimeMutexes[i].Lock()
  726. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  727. subscribersMutexes[i].Lock()
  728. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  729. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  730. var faultLabel string
  731. for _, f := range masterConfig.RuleOfReferenceDisplay {
  732. faultLabel = f(data)
  733. if faultLabel != "" {
  734. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  735. subscribersTimes[i] = time.Now()
  736. break
  737. }
  738. }
  739. subscribersMutexes[i].Unlock()
  740. }
  741. subscribersTimeMutexes[i].Unlock()
  742. },
  743. })
  744. }
  745. // 26
  746. if topic == masterConfig.TopicOfReferenceTrajectory && len(masterConfig.RuleOfReferenceTrajectory) > 0 {
  747. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  748. Node: commonConfig.RosNode,
  749. Topic: topic,
  750. Callback: func(data *pjisuv_msgs.Trajectory) {
  751. subscribersTimeMutexes[i].Lock()
  752. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  753. subscribersMutexes[i].Lock()
  754. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  755. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  756. var faultLabel string
  757. for _, f := range masterConfig.RuleOfReferenceTrajectory {
  758. faultLabel = f(data)
  759. if faultLabel != "" {
  760. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  761. subscribersTimes[i] = time.Now()
  762. break
  763. }
  764. }
  765. subscribersMutexes[i].Unlock()
  766. }
  767. subscribersTimeMutexes[i].Unlock()
  768. },
  769. })
  770. }
  771. // 27
  772. if topic == masterConfig.TopicOfRoiPoints && len(masterConfig.RuleOfRoiPoints) > 0 {
  773. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  774. Node: commonConfig.RosNode,
  775. Topic: topic,
  776. Callback: func(data *sensor_msgs.PointCloud2) {
  777. subscribersTimeMutexes[i].Lock()
  778. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  779. subscribersMutexes[i].Lock()
  780. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  781. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  782. var faultLabel string
  783. for _, f := range masterConfig.RuleOfRoiPoints {
  784. faultLabel = f(data)
  785. if faultLabel != "" {
  786. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  787. subscribersTimes[i] = time.Now()
  788. break
  789. }
  790. }
  791. subscribersMutexes[i].Unlock()
  792. }
  793. subscribersTimeMutexes[i].Unlock()
  794. },
  795. })
  796. }
  797. // 28
  798. if topic == masterConfig.TopicOfRoiPolygon && len(masterConfig.RuleOfRoiPolygon) > 0 {
  799. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  800. Node: commonConfig.RosNode,
  801. Topic: topic,
  802. Callback: func(data *nav_msgs.Path) {
  803. subscribersTimeMutexes[i].Lock()
  804. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  805. subscribersMutexes[i].Lock()
  806. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  807. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  808. var faultLabel string
  809. for _, f := range masterConfig.RuleOfRoiPolygon {
  810. faultLabel = f(data)
  811. if faultLabel != "" {
  812. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  813. subscribersTimes[i] = time.Now()
  814. break
  815. }
  816. }
  817. subscribersMutexes[i].Unlock()
  818. }
  819. subscribersTimeMutexes[i].Unlock()
  820. },
  821. })
  822. }
  823. // 29
  824. if topic == masterConfig.TopicOfTf && len(masterConfig.RuleOfTf) > 0 {
  825. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  826. Node: commonConfig.RosNode,
  827. Topic: topic,
  828. Callback: func(data *tf2_msgs.TFMessage) {
  829. subscribersTimeMutexes[i].Lock()
  830. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  831. subscribersMutexes[i].Lock()
  832. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  833. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  834. var faultLabel string
  835. for _, f := range masterConfig.RuleOfTf {
  836. faultLabel = f(data)
  837. if faultLabel != "" {
  838. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  839. subscribersTimes[i] = time.Now()
  840. break
  841. }
  842. }
  843. subscribersMutexes[i].Unlock()
  844. }
  845. subscribersTimeMutexes[i].Unlock()
  846. },
  847. })
  848. }
  849. // 30
  850. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  851. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  852. Node: commonConfig.RosNode,
  853. Topic: topic,
  854. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  855. // 更新共享变量
  856. mutexOfTpperception.RLock()
  857. {
  858. for _, obj := range data.Objs {
  859. if obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  860. continue
  861. }
  862. // 检查 ObjDicOfTpperception 是否为 nil,如果是,则初始化它
  863. if pjisuvParam.ObjDicOfTpperception == nil {
  864. pjisuvParam.ObjDicOfTpperception = make(map[uint32][]float32)
  865. }
  866. if _, ok := pjisuvParam.ObjDicOfTpperception[obj.Id]; !ok {
  867. pjisuvParam.ObjDicOfTpperception[obj.Id] = []float32{}
  868. }
  869. pjisuvParam.ObjDicOfTpperception[obj.Id] = append(pjisuvParam.ObjDicOfTpperception[obj.Id], obj.Y)
  870. }
  871. }
  872. mutexOfTpperception.RUnlock()
  873. subscribersTimeMutexes[i].Lock()
  874. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  875. subscribersMutexes[i].Lock()
  876. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  877. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  878. var faultLabel string
  879. for _, f := range masterConfig.RuleOfTpperception {
  880. faultLabel = f(data, pjisuvParam)
  881. if faultLabel != "" {
  882. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  883. break
  884. }
  885. }
  886. subscribersMutexes[i].Unlock()
  887. }
  888. subscribersTimeMutexes[i].Unlock()
  889. // -------- 触发后更新共享变量
  890. mutexOfTpperception.RLock()
  891. {
  892. for _, obj := range data.Objs {
  893. pjisuvParam.ObjTypeDicOfTpperception[obj.Id] = obj.Type
  894. pjisuvParam.ObjSpeedDicOfTpperception[obj.Id] = math.Pow(math.Pow(float64(obj.Vxabs), 2)+math.Pow(float64(obj.Vyabs), 2), 0.5)
  895. }
  896. }
  897. mutexOfTpperception.RUnlock()
  898. },
  899. })
  900. }
  901. // 31
  902. if topic == masterConfig.TopicOfTpperceptionVis && len(masterConfig.RuleOfTpperceptionVis) > 0 {
  903. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  904. Node: commonConfig.RosNode,
  905. Topic: topic,
  906. Callback: func(data *visualization_msgs.MarkerArray) {
  907. subscribersTimeMutexes[i].Lock()
  908. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  909. subscribersMutexes[i].Lock()
  910. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  911. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  912. var faultLabel string
  913. for _, f := range masterConfig.RuleOfTpperceptionVis {
  914. faultLabel = f(data)
  915. if faultLabel != "" {
  916. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  917. subscribersTimes[i] = time.Now()
  918. break
  919. }
  920. }
  921. subscribersMutexes[i].Unlock()
  922. }
  923. subscribersTimeMutexes[i].Unlock()
  924. },
  925. })
  926. }
  927. // 32
  928. if topic == masterConfig.TopicOfTprouteplan && len(masterConfig.RuleOfTprouteplan) > 0 {
  929. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  930. Node: commonConfig.RosNode,
  931. Topic: topic,
  932. Callback: func(data *pjisuv_msgs.RoutePlan) {
  933. subscribersTimeMutexes[i].Lock()
  934. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  935. subscribersMutexes[i].Lock()
  936. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  937. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  938. var faultLabel string
  939. for _, f := range masterConfig.RuleOfTprouteplan {
  940. faultLabel = f(data)
  941. if faultLabel != "" {
  942. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  943. subscribersTimes[i] = time.Now()
  944. break
  945. }
  946. }
  947. subscribersMutexes[i].Unlock()
  948. }
  949. subscribersTimeMutexes[i].Unlock()
  950. },
  951. })
  952. }
  953. // 33
  954. if topic == masterConfig.TopicOfTrajectoryDisplay && len(masterConfig.RuleOfTrajectoryDisplay) > 0 {
  955. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  956. Node: commonConfig.RosNode,
  957. Topic: topic,
  958. Callback: func(data *nav_msgs.Path) {
  959. subscribersTimeMutexes[i].Lock()
  960. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  961. subscribersMutexes[i].Lock()
  962. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  963. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  964. var faultLabel string
  965. for _, f := range masterConfig.RuleOfTrajectoryDisplay {
  966. faultLabel = f(data)
  967. if faultLabel != "" {
  968. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  969. subscribersTimes[i] = time.Now()
  970. break
  971. }
  972. }
  973. subscribersMutexes[i].Unlock()
  974. }
  975. subscribersTimeMutexes[i].Unlock()
  976. },
  977. })
  978. }
  979. // 34
  980. if topic == masterConfig.TopicOfUngroundCloudpoints && len(masterConfig.RuleOfUngroundCloudpoints) > 0 {
  981. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  982. Node: commonConfig.RosNode,
  983. Topic: topic,
  984. Callback: func(data *sensor_msgs.PointCloud2) {
  985. subscribersTimeMutexes[i].Lock()
  986. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  987. subscribersMutexes[i].Lock()
  988. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  989. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  990. var faultLabel string
  991. for _, f := range masterConfig.RuleOfUngroundCloudpoints {
  992. faultLabel = f(data)
  993. if faultLabel != "" {
  994. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  995. subscribersTimes[i] = time.Now()
  996. break
  997. }
  998. }
  999. subscribersMutexes[i].Unlock()
  1000. }
  1001. subscribersTimeMutexes[i].Unlock()
  1002. },
  1003. })
  1004. }
  1005. // 35
  1006. if topic == masterConfig.TopicOfCameraImage && len(masterConfig.RuleOfCameraImage) > 0 {
  1007. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1008. Node: commonConfig.RosNode,
  1009. Topic: topic,
  1010. Callback: func(data *sensor_msgs.Image) {
  1011. subscribersTimeMutexes[i].Lock()
  1012. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1013. subscribersMutexes[i].Lock()
  1014. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1015. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1016. var faultLabel string
  1017. for _, f := range masterConfig.RuleOfCameraImage {
  1018. faultLabel = f(data)
  1019. if faultLabel != "" {
  1020. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1021. subscribersTimes[i] = time.Now()
  1022. break
  1023. }
  1024. }
  1025. subscribersMutexes[i].Unlock()
  1026. }
  1027. subscribersTimeMutexes[i].Unlock()
  1028. },
  1029. })
  1030. }
  1031. // 36
  1032. if topic == masterConfig.TopicOfDataRead && len(masterConfig.RuleOfDataRead) > 0 {
  1033. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1034. Node: commonConfig.RosNode,
  1035. Topic: topic,
  1036. Callback: func(data *pjisuv_msgs.Retrieval) {
  1037. // 更新共享变量
  1038. mutexOfDataRead.RLock()
  1039. {
  1040. pjisuvParam.NumCountDataReadOfDataRead++
  1041. if pjisuvParam.NumCountDataReadOfDataRead == 10 {
  1042. pjisuvParam.EgoSteeringRealOfDataRead = append(pjisuvParam.EgoSteeringRealOfDataRead, data.ActStrWhAng)
  1043. pjisuvParam.EgoThrottleRealOfDataRead = append(pjisuvParam.EgoThrottleRealOfDataRead, data.AccPed2)
  1044. pjisuvParam.NumCountDataReadOfDataRead = 0
  1045. }
  1046. pjisuvParam.StrgAngleRealValueOfDataRead = data.ActStrWhAng
  1047. }
  1048. mutexOfDataRead.RUnlock()
  1049. subscribersTimeMutexes[i].Lock()
  1050. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1051. subscribersMutexes[i].Lock()
  1052. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1053. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1054. var faultLabel string
  1055. for _, f := range masterConfig.RuleOfDataRead {
  1056. faultLabel = f(data)
  1057. if faultLabel != "" {
  1058. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1059. subscribersTimes[i] = time.Now()
  1060. break
  1061. }
  1062. }
  1063. subscribersMutexes[i].Unlock()
  1064. }
  1065. subscribersTimeMutexes[i].Unlock()
  1066. },
  1067. })
  1068. }
  1069. // 37
  1070. if topic == masterConfig.TopicOfPjiGps && len(masterConfig.RuleOfPjiGps) > 0 {
  1071. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1072. Node: commonConfig.RosNode,
  1073. Topic: topic,
  1074. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  1075. subscribersTimeMutexes[i].Lock()
  1076. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1077. subscribersMutexes[i].Lock()
  1078. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1079. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1080. var faultLabel string
  1081. for _, f := range masterConfig.RuleOfPjiGps {
  1082. faultLabel = f(data)
  1083. if faultLabel != "" {
  1084. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1085. subscribersTimes[i] = time.Now()
  1086. break
  1087. }
  1088. }
  1089. subscribersMutexes[i].Unlock()
  1090. }
  1091. subscribersTimeMutexes[i].Unlock()
  1092. },
  1093. })
  1094. }
  1095. // 39
  1096. if topic == masterConfig.TopicOfPjVehicleFdbPub && len(masterConfig.RuleOfPjVehicleFdbPub) > 0 {
  1097. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1098. Node: commonConfig.RosNode,
  1099. Topic: topic,
  1100. Callback: func(data *pjisuv_msgs.VehicleFdb) {
  1101. subscribersTimeMutexes[i].Lock()
  1102. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1103. subscribersMutexes[i].Lock()
  1104. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1105. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1106. var faultLabel string
  1107. for _, f := range masterConfig.RuleOfPjVehicleFdbPub {
  1108. faultLabel = f(data, &pjisuvParam)
  1109. if faultLabel != "" {
  1110. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1111. subscribersTimes[i] = time.Now()
  1112. break
  1113. }
  1114. }
  1115. subscribersMutexes[i].Unlock()
  1116. }
  1117. subscribersTimeMutexes[i].Unlock()
  1118. },
  1119. })
  1120. }
  1121. if err != nil {
  1122. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  1123. //TODO 如何回传日志
  1124. continue
  1125. }
  1126. }
  1127. select {
  1128. case signal := <-service.ChannelKillWindowProducer:
  1129. if signal == 1 {
  1130. commonConfig.RosNode.Close()
  1131. service.AddKillTimes("3")
  1132. return
  1133. }
  1134. }
  1135. }
  1136. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  1137. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1138. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) { // 如果是不在旧故障窗口内,添加一个新窗口
  1139. newTimeWindow := commonEntity.TimeWindow{
  1140. FaultTime: faultHappenTime,
  1141. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  1142. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  1143. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  1144. Labels: []string{faultLabel},
  1145. MasterTopics: masterTopics,
  1146. SlaveTopics: slaveTopics,
  1147. }
  1148. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1149. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1150. } else { // 如果在旧故障窗口内
  1151. commonEntity.TimeWindowProducerQueueMutex.RLock()
  1152. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  1153. // 更新故障窗口end时间
  1154. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  1155. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1156. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  1157. lastTimeWindow.TimeWindowEnd = maxEnd
  1158. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  1159. } else {
  1160. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  1161. lastTimeWindow.TimeWindowEnd = expectEnd
  1162. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1163. }
  1164. }
  1165. // 更新label
  1166. labels := lastTimeWindow.Labels
  1167. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1168. // 更新 topic
  1169. sourceMasterTopics := lastTimeWindow.MasterTopics
  1170. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1171. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1172. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1173. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1174. }
  1175. }
  1176. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1177. // 获取所有需要采集的topic
  1178. var faultCodeTopics []string
  1179. for _, code := range commonConfig.CloudConfig.Triggers {
  1180. if code.Label == faultLabel {
  1181. faultCodeTopics = code.Topics
  1182. }
  1183. }
  1184. // 根据不同节点采集的topic进行分配采集
  1185. for _, acceptTopic := range faultCodeTopics {
  1186. for _, host := range commonConfig.CloudConfig.Hosts {
  1187. for _, topic := range host.Topics {
  1188. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1189. masterTopics = append(masterTopics, acceptTopic)
  1190. }
  1191. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1192. slaveTopics = append(slaveTopics, acceptTopic)
  1193. }
  1194. }
  1195. }
  1196. }
  1197. return masterTopics, slaveTopics
  1198. }