produce_window.go 45 KB


  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "cicv-data-closedloop/pjisuv_param"
  11. "github.com/bluenviron/goroslib/v2"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  16. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  17. "math"
  18. "sync"
  19. "time"
  20. )
  21. // 所有共享变量
  22. var (
  23. pjisuvParam = pjisuv_param.PjisuvParam{
  24. ObjDicOfTpperception: make(map[uint32][]float32),
  25. ObjTypeDicOfTpperception: make(map[uint32]uint8),
  26. ObjSpeedDicOfTpperception: make(map[uint32]float64),
  27. } // /cicv_location
  28. mutexOfCicvLocation sync.RWMutex
  29. // /tpperception
  30. mutexOfTpperception sync.RWMutex
  31. // /pj_control_pub
  32. mutexOfPjControlPub sync.RWMutex
  33. // /data_read
  34. mutexOfDataRead sync.RWMutex
  35. // /pj_vehicle_fdb_pub
  36. mutexOfPjVehicleFdbPub sync.RWMutex
  37. // /pj_vehicle_fdb_pub
  38. mutexOfCicvAmrTrajectory sync.RWMutex
  39. )
  40. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  41. func PrepareTimeWindowProducerQueue() {
  42. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  43. var err error
  44. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  45. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  46. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  47. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  48. for i, topic := range commonConfig.SubscribeTopics {
  49. // 增加了可扩展性
  50. if topic == masterConfig.TopicOfCicvExtend {
  51. go func() {
  52. for {
  53. time.Sleep(time.Duration(3500) * time.Millisecond)
  54. for _, f := range masterConfig.RuleOfCicvExtend {
  55. label := f(pjisuvParam)
  56. if label != "" {
  57. saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
  58. break
  59. }
  60. }
  61. }
  62. }()
  63. }
  64. // 其他常规监听器
  65. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  66. // 1
  67. if topic == masterConfig.TopicOfAmrPose && len(masterConfig.RuleOfAmrPose) > 0 {
  68. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  69. Node: commonConfig.RosNode,
  70. Topic: topic,
  71. Callback: func(data *visualization_msgs.MarkerArray) {
  72. subscribersTimeMutexes[i].Lock()
  73. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  74. subscribersMutexes[i].Lock()
  75. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  76. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  77. var faultLabel string
  78. for _, f := range masterConfig.RuleOfAmrPose {
  79. faultLabel = f(data)
  80. if faultLabel != "" {
  81. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  82. subscribersTimes[i] = time.Now()
  83. break
  84. }
  85. }
  86. subscribersMutexes[i].Unlock()
  87. }
  88. subscribersTimeMutexes[i].Unlock()
  89. },
  90. })
  91. }
  92. // 2
  93. if topic == masterConfig.TopicOfBoundingBoxesFast && len(masterConfig.RuleOfBoundingBoxesFast) > 0 {
  94. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  95. Node: commonConfig.RosNode,
  96. Topic: topic,
  97. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  98. subscribersTimeMutexes[i].Lock()
  99. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  100. subscribersMutexes[i].Lock()
  101. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  102. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  103. var faultLabel string
  104. for _, f := range masterConfig.RuleOfBoundingBoxesFast {
  105. faultLabel = f(data)
  106. if faultLabel != "" {
  107. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  108. subscribersTimes[i] = time.Now()
  109. break
  110. }
  111. }
  112. subscribersMutexes[i].Unlock()
  113. }
  114. subscribersTimeMutexes[i].Unlock()
  115. },
  116. })
  117. }
  118. // 3
  119. if topic == masterConfig.TopicOfCameraFault && len(masterConfig.RuleOfCameraFault) > 0 {
  120. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  121. Node: commonConfig.RosNode,
  122. Topic: topic,
  123. Callback: func(data *pjisuv_msgs.FaultVec) {
  124. subscribersTimeMutexes[i].Lock()
  125. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  126. subscribersMutexes[i].Lock()
  127. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  128. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  129. var faultLabel string
  130. for _, f := range masterConfig.RuleOfCameraFault {
  131. faultLabel = f(data)
  132. if faultLabel != "" {
  133. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  134. subscribersTimes[i] = time.Now()
  135. break
  136. }
  137. }
  138. subscribersMutexes[i].Unlock()
  139. }
  140. subscribersTimeMutexes[i].Unlock()
  141. },
  142. })
  143. }
  144. // 4
  145. if topic == masterConfig.TopicOfCanData && len(masterConfig.RuleOfCanData) > 0 {
  146. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  147. Node: commonConfig.RosNode,
  148. Topic: topic,
  149. Callback: func(data *pjisuv_msgs.Frame) {
  150. subscribersTimeMutexes[i].Lock()
  151. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  152. subscribersMutexes[i].Lock()
  153. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  154. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  155. var faultLabel string
  156. for _, f := range masterConfig.RuleOfCanData {
  157. faultLabel = f(data)
  158. if faultLabel != "" {
  159. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  160. subscribersTimes[i] = time.Now()
  161. break
  162. }
  163. }
  164. subscribersMutexes[i].Unlock()
  165. }
  166. subscribersTimeMutexes[i].Unlock()
  167. },
  168. })
  169. }
  170. // 5
  171. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud && len(masterConfig.RuleOfCh128x1LslidarPointCloud) > 0 {
  172. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  173. Node: commonConfig.RosNode,
  174. Topic: topic,
  175. Callback: func(data *sensor_msgs.PointCloud2) {
  176. subscribersTimeMutexes[i].Lock()
  177. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  178. subscribersMutexes[i].Lock()
  179. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  180. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  181. var faultLabel string
  182. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud {
  183. faultLabel = f(data)
  184. if faultLabel != "" {
  185. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  186. subscribersTimes[i] = time.Now()
  187. break
  188. }
  189. }
  190. subscribersMutexes[i].Unlock()
  191. }
  192. subscribersTimeMutexes[i].Unlock()
  193. },
  194. })
  195. }
  196. // 6
  197. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud && len(masterConfig.RuleOfCh64wLLslidarPointCloud) > 0 {
  198. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  199. Node: commonConfig.RosNode,
  200. Topic: topic,
  201. Callback: func(data *sensor_msgs.PointCloud2) {
  202. subscribersTimeMutexes[i].Lock()
  203. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  204. subscribersMutexes[i].Lock()
  205. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  206. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  207. var faultLabel string
  208. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud {
  209. faultLabel = f(data)
  210. if faultLabel != "" {
  211. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  212. subscribersTimes[i] = time.Now()
  213. break
  214. }
  215. }
  216. subscribersMutexes[i].Unlock()
  217. }
  218. subscribersTimeMutexes[i].Unlock()
  219. },
  220. })
  221. }
  222. // 7
  223. if topic == masterConfig.TopicOfCh64wLScan && len(masterConfig.RuleOfCh64wLScan) > 0 {
  224. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  225. Node: commonConfig.RosNode,
  226. Topic: topic,
  227. Callback: func(data *sensor_msgs.LaserScan) {
  228. subscribersTimeMutexes[i].Lock()
  229. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  230. subscribersMutexes[i].Lock()
  231. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  232. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  233. var faultLabel string
  234. for _, f := range masterConfig.RuleOfCh64wLScan {
  235. faultLabel = f(data)
  236. if faultLabel != "" {
  237. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  238. subscribersTimes[i] = time.Now()
  239. break
  240. }
  241. }
  242. subscribersMutexes[i].Unlock()
  243. }
  244. subscribersTimeMutexes[i].Unlock()
  245. },
  246. })
  247. }
  248. // 8
  249. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud && len(masterConfig.RuleOfCh64wRLslidarPointCloud) > 0 {
  250. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  251. Node: commonConfig.RosNode,
  252. Topic: topic,
  253. Callback: func(data *sensor_msgs.PointCloud2) {
  254. subscribersTimeMutexes[i].Lock()
  255. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  256. subscribersMutexes[i].Lock()
  257. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  258. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  259. var faultLabel string
  260. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud {
  261. faultLabel = f(data)
  262. if faultLabel != "" {
  263. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  264. subscribersTimes[i] = time.Now()
  265. break
  266. }
  267. }
  268. subscribersMutexes[i].Unlock()
  269. }
  270. subscribersTimeMutexes[i].Unlock()
  271. },
  272. })
  273. }
  274. // 9
  275. if topic == masterConfig.TopicOfCh64wRScan && len(masterConfig.RuleOfCh64wRScan) > 0 {
  276. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  277. Node: commonConfig.RosNode,
  278. Topic: topic,
  279. Callback: func(data *sensor_msgs.LaserScan) {
  280. subscribersTimeMutexes[i].Lock()
  281. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  282. subscribersMutexes[i].Lock()
  283. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  284. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  285. var faultLabel string
  286. for _, f := range masterConfig.RuleOfCh64wRScan {
  287. faultLabel = f(data)
  288. if faultLabel != "" {
  289. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  290. subscribersTimes[i] = time.Now()
  291. break
  292. }
  293. }
  294. subscribersMutexes[i].Unlock()
  295. }
  296. subscribersTimeMutexes[i].Unlock()
  297. },
  298. })
  299. }
  300. // 10
  301. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects && len(masterConfig.RuleOfCicvLidarclusterMovingObjects) > 0 {
  302. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  303. Node: commonConfig.RosNode,
  304. Topic: topic,
  305. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  306. subscribersTimeMutexes[i].Lock()
  307. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  308. subscribersMutexes[i].Lock()
  309. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  310. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  311. var faultLabel string
  312. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects {
  313. faultLabel = f(data)
  314. if faultLabel != "" {
  315. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  316. subscribersTimes[i] = time.Now()
  317. break
  318. }
  319. }
  320. subscribersMutexes[i].Unlock()
  321. }
  322. subscribersTimeMutexes[i].Unlock()
  323. },
  324. })
  325. }
  326. // 11
  327. if topic == masterConfig.TopicOfCicvAmrTrajectory && len(masterConfig.RuleOfCicvAmrTrajectory) > 0 {
  328. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  329. Node: commonConfig.RosNode,
  330. Topic: topic,
  331. Callback: func(data *pjisuv_msgs.Trajectory) {
  332. subscribersTimeMutexes[i].Lock()
  333. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  334. subscribersMutexes[i].Lock()
  335. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  336. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  337. var faultLabel string
  338. for _, f := range masterConfig.RuleOfCicvAmrTrajectory {
  339. faultLabel = f(data, pjisuvParam)
  340. if faultLabel != "" {
  341. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  342. subscribersTimes[i] = time.Now()
  343. break
  344. }
  345. }
  346. subscribersMutexes[i].Unlock()
  347. }
  348. subscribersTimeMutexes[i].Unlock()
  349. // 触发后更新共享变量
  350. mutexOfCicvAmrTrajectory.RLock()
  351. {
  352. var currentCurvateres []float64
  353. for _, point := range data.Trajectoryinfo.Trajectorypoints {
  354. currentCurvateres = append(currentCurvateres, math.Abs(float64(point.Curvature)))
  355. }
  356. pjisuvParam.LastCurvaturesOfCicvAmrTrajectory = currentCurvateres
  357. }
  358. mutexOfCicvAmrTrajectory.RUnlock()
  359. },
  360. })
  361. }
  362. // 12
  363. if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
  364. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  365. Node: commonConfig.RosNode,
  366. Topic: topic,
  367. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  368. // 更新共享变量
  369. mutexOfCicvLocation.RLock()
  370. {
  371. pjisuvParam.VelocityXOfCicvLocation = data.VelocityX
  372. pjisuvParam.VelocityYOfCicvLocation = data.VelocityY
  373. pjisuvParam.VelocityZOfCicvLocation = data.VelocityZ
  374. pjisuvParam.YawOfCicvLocation = data.Yaw
  375. pjisuvParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
  376. pjisuvParam.PositionXOfCicvLocation = data.PositionX
  377. pjisuvParam.PositionYOfCicvLocation = data.PositionY
  378. }
  379. mutexOfCicvLocation.RUnlock()
  380. subscribersTimeMutexes[i].Lock()
  381. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  382. subscribersMutexes[i].Lock()
  383. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  384. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  385. var faultLabel string
  386. for _, f := range masterConfig.RuleOfCicvLocation {
  387. faultLabel = f(data, pjisuvParam)
  388. if faultLabel != "" {
  389. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  390. break
  391. }
  392. }
  393. subscribersMutexes[i].Unlock()
  394. }
  395. subscribersTimeMutexes[i].Unlock()
  396. },
  397. })
  398. }
  399. // 13
  400. if topic == masterConfig.TopicOfCloudClusters && len(masterConfig.RuleOfCloudClusters) > 0 {
  401. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  402. Node: commonConfig.RosNode,
  403. Topic: topic,
  404. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  405. subscribersTimeMutexes[i].Lock()
  406. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  407. subscribersMutexes[i].Lock()
  408. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  409. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  410. var faultLabel string
  411. for _, f := range masterConfig.RuleOfCloudClusters {
  412. faultLabel = f(data)
  413. if faultLabel != "" {
  414. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  415. subscribersTimes[i] = time.Now()
  416. break
  417. }
  418. }
  419. subscribersMutexes[i].Unlock()
  420. }
  421. subscribersTimeMutexes[i].Unlock()
  422. },
  423. })
  424. }
  425. // 14
  426. if topic == masterConfig.TopicOfHeartbeatInfo && len(masterConfig.RuleOfHeartbeatInfo) > 0 {
  427. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  428. Node: commonConfig.RosNode,
  429. Topic: topic,
  430. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  431. subscribersTimeMutexes[i].Lock()
  432. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  433. subscribersMutexes[i].Lock()
  434. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  435. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  436. var faultLabel string
  437. for _, f := range masterConfig.RuleOfHeartbeatInfo {
  438. faultLabel = f(data)
  439. if faultLabel != "" {
  440. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  441. subscribersTimes[i] = time.Now()
  442. break
  443. }
  444. }
  445. subscribersMutexes[i].Unlock()
  446. }
  447. subscribersTimeMutexes[i].Unlock()
  448. },
  449. })
  450. }
  451. // 15
  452. if topic == masterConfig.TopicOfLidarPretreatmentCost && len(masterConfig.RuleOfLidarPretreatmentCost) > 0 {
  453. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  454. Node: commonConfig.RosNode,
  455. Topic: topic,
  456. Callback: func(data *geometry_msgs.Vector3Stamped) {
  457. subscribersTimeMutexes[i].Lock()
  458. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  459. subscribersMutexes[i].Lock()
  460. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  461. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  462. var faultLabel string
  463. for _, f := range masterConfig.RuleOfLidarPretreatmentCost {
  464. faultLabel = f(data)
  465. if faultLabel != "" {
  466. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  467. subscribersTimes[i] = time.Now()
  468. break
  469. }
  470. }
  471. subscribersMutexes[i].Unlock()
  472. }
  473. subscribersTimeMutexes[i].Unlock()
  474. },
  475. })
  476. }
  477. // 16
  478. if topic == masterConfig.TopicOfLidarPretreatmentOdometry && len(masterConfig.RuleOfLidarPretreatmentOdometry) > 0 {
  479. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  480. Node: commonConfig.RosNode,
  481. Topic: topic,
  482. Callback: func(data *nav_msgs.Odometry) {
  483. subscribersTimeMutexes[i].Lock()
  484. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  485. subscribersMutexes[i].Lock()
  486. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  487. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  488. var faultLabel string
  489. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry {
  490. faultLabel = f(data)
  491. if faultLabel != "" {
  492. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  493. subscribersTimes[i] = time.Now()
  494. break
  495. }
  496. }
  497. subscribersMutexes[i].Unlock()
  498. }
  499. subscribersTimeMutexes[i].Unlock()
  500. },
  501. })
  502. }
  503. // 17
  504. if topic == masterConfig.TopicOfLidarRoi && len(masterConfig.RuleOfLidarRoi) > 0 {
  505. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  506. Node: commonConfig.RosNode,
  507. Topic: topic,
  508. Callback: func(data *geometry_msgs.PolygonStamped) {
  509. subscribersTimeMutexes[i].Lock()
  510. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  511. subscribersMutexes[i].Lock()
  512. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  513. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  514. var faultLabel string
  515. for _, f := range masterConfig.RuleOfLidarRoi {
  516. faultLabel = f(data)
  517. if faultLabel != "" {
  518. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  519. subscribersTimes[i] = time.Now()
  520. break
  521. }
  522. }
  523. subscribersMutexes[i].Unlock()
  524. }
  525. subscribersTimeMutexes[i].Unlock()
  526. },
  527. })
  528. }
  529. // 18
  530. if topic == masterConfig.TopicOfLine1 && len(masterConfig.RuleOfLine1) > 0 {
  531. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  532. Node: commonConfig.RosNode,
  533. Topic: topic,
  534. Callback: func(data *nav_msgs.Path) {
  535. subscribersTimeMutexes[i].Lock()
  536. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  537. subscribersMutexes[i].Lock()
  538. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  539. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  540. var faultLabel string
  541. for _, f := range masterConfig.RuleOfLine1 {
  542. faultLabel = f(data)
  543. if faultLabel != "" {
  544. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  545. subscribersTimes[i] = time.Now()
  546. break
  547. }
  548. }
  549. subscribersMutexes[i].Unlock()
  550. }
  551. subscribersTimeMutexes[i].Unlock()
  552. },
  553. })
  554. }
  555. // 19
  556. if topic == masterConfig.TopicOfLine2 && len(masterConfig.RuleOfLine2) > 0 {
  557. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  558. Node: commonConfig.RosNode,
  559. Topic: topic,
  560. Callback: func(data *nav_msgs.Path) {
  561. subscribersTimeMutexes[i].Lock()
  562. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  563. subscribersMutexes[i].Lock()
  564. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  565. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  566. var faultLabel string
  567. for _, f := range masterConfig.RuleOfLine2 {
  568. faultLabel = f(data)
  569. if faultLabel != "" {
  570. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  571. subscribersTimes[i] = time.Now()
  572. break
  573. }
  574. }
  575. subscribersMutexes[i].Unlock()
  576. }
  577. subscribersTimeMutexes[i].Unlock()
  578. },
  579. })
  580. }
  581. // 20
  582. if topic == masterConfig.TopicOfMapPolygon && len(masterConfig.RuleOfMapPolygon) > 0 {
  583. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  584. Node: commonConfig.RosNode,
  585. Topic: topic,
  586. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  587. subscribersTimeMutexes[i].Lock()
  588. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  589. subscribersMutexes[i].Lock()
  590. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  591. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  592. var faultLabel string
  593. for _, f := range masterConfig.RuleOfMapPolygon {
  594. faultLabel = f(data, pjisuvParam)
  595. if faultLabel != "" {
  596. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  597. subscribersTimes[i] = time.Now()
  598. break
  599. }
  600. }
  601. subscribersMutexes[i].Unlock()
  602. }
  603. subscribersTimeMutexes[i].Unlock()
  604. },
  605. })
  606. }
  607. // 21
  608. if topic == masterConfig.TopicOfObstacleDisplay && len(masterConfig.RuleOfObstacleDisplay) > 0 {
  609. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  610. Node: commonConfig.RosNode,
  611. Topic: topic,
  612. Callback: func(data *visualization_msgs.MarkerArray) {
  613. subscribersTimeMutexes[i].Lock()
  614. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  615. subscribersMutexes[i].Lock()
  616. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  617. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  618. var faultLabel string
  619. for _, f := range masterConfig.RuleOfObstacleDisplay {
  620. faultLabel = f(data)
  621. if faultLabel != "" {
  622. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  623. subscribersTimes[i] = time.Now()
  624. break
  625. }
  626. }
  627. subscribersMutexes[i].Unlock()
  628. }
  629. subscribersTimeMutexes[i].Unlock()
  630. },
  631. })
  632. }
  633. // 22
  634. if topic == masterConfig.TopicOfPjControlPub && len(masterConfig.RuleOfPjControlPub) > 0 {
  635. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  636. Node: commonConfig.RosNode,
  637. Topic: topic,
  638. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  639. // 更新共享变量
  640. mutexOfPjControlPub.RLock()
  641. {
  642. pjisuvParam.NumCountPjiControlCommandOfPjControlPub++
  643. if pjisuvParam.NumCountPjiControlCommandOfPjControlPub == 10 {
  644. pjisuvParam.EgoSteeringCmdOfPjControlPub = append(pjisuvParam.EgoSteeringCmdOfPjControlPub, data.ICPVCmdStrAngle)
  645. pjisuvParam.EgoThrottleCmdOfPjControlPub = append(pjisuvParam.EgoThrottleCmdOfPjControlPub, data.ICPVCmdAccPelPosAct)
  646. pjisuvParam.NumCountPjiControlCommandOfPjControlPub = 0
  647. }
  648. }
  649. mutexOfPjControlPub.RUnlock()
  650. subscribersTimeMutexes[i].Lock()
  651. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  652. subscribersMutexes[i].Lock()
  653. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  654. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  655. var faultLabel string
  656. for _, f := range masterConfig.RuleOfPjControlPub {
  657. faultLabel = f(data)
  658. if faultLabel != "" {
  659. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  660. subscribersTimes[i] = time.Now()
  661. break
  662. }
  663. }
  664. subscribersMutexes[i].Unlock()
  665. }
  666. subscribersTimeMutexes[i].Unlock()
  667. },
  668. })
  669. }
  670. // 23
  671. if topic == masterConfig.TopicOfPointsCluster && len(masterConfig.RuleOfPointsCluster) > 0 {
  672. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  673. Node: commonConfig.RosNode,
  674. Topic: topic,
  675. Callback: func(data *sensor_msgs.PointCloud2) {
  676. subscribersTimeMutexes[i].Lock()
  677. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  678. subscribersMutexes[i].Lock()
  679. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  680. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  681. var faultLabel string
  682. for _, f := range masterConfig.RuleOfPointsCluster {
  683. faultLabel = f(data)
  684. if faultLabel != "" {
  685. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  686. subscribersTimes[i] = time.Now()
  687. break
  688. }
  689. }
  690. subscribersMutexes[i].Unlock()
  691. }
  692. subscribersTimeMutexes[i].Unlock()
  693. },
  694. })
  695. }
  696. // 24
  697. if topic == masterConfig.TopicOfPointsConcat && len(masterConfig.RuleOfPointsConcat) > 0 {
  698. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  699. Node: commonConfig.RosNode,
  700. Topic: topic,
  701. Callback: func(data *sensor_msgs.PointCloud2) {
  702. subscribersTimeMutexes[i].Lock()
  703. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  704. subscribersMutexes[i].Lock()
  705. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  706. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  707. var faultLabel string
  708. for _, f := range masterConfig.RuleOfPointsConcat {
  709. faultLabel = f(data)
  710. if faultLabel != "" {
  711. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  712. subscribersTimes[i] = time.Now()
  713. break
  714. }
  715. }
  716. subscribersMutexes[i].Unlock()
  717. }
  718. subscribersTimeMutexes[i].Unlock()
  719. },
  720. })
  721. }
  722. // 25
  723. if topic == masterConfig.TopicOfReferenceDisplay && len(masterConfig.RuleOfReferenceDisplay) > 0 {
  724. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  725. Node: commonConfig.RosNode,
  726. Topic: topic,
  727. Callback: func(data *nav_msgs.Path) {
  728. subscribersTimeMutexes[i].Lock()
  729. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  730. subscribersMutexes[i].Lock()
  731. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  732. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  733. var faultLabel string
  734. for _, f := range masterConfig.RuleOfReferenceDisplay {
  735. faultLabel = f(data)
  736. if faultLabel != "" {
  737. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  738. subscribersTimes[i] = time.Now()
  739. break
  740. }
  741. }
  742. subscribersMutexes[i].Unlock()
  743. }
  744. subscribersTimeMutexes[i].Unlock()
  745. },
  746. })
  747. }
  748. // 26
  749. if topic == masterConfig.TopicOfReferenceTrajectory && len(masterConfig.RuleOfReferenceTrajectory) > 0 {
  750. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  751. Node: commonConfig.RosNode,
  752. Topic: topic,
  753. Callback: func(data *pjisuv_msgs.Trajectory) {
  754. subscribersTimeMutexes[i].Lock()
  755. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  756. subscribersMutexes[i].Lock()
  757. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  758. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  759. var faultLabel string
  760. for _, f := range masterConfig.RuleOfReferenceTrajectory {
  761. faultLabel = f(data)
  762. if faultLabel != "" {
  763. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  764. subscribersTimes[i] = time.Now()
  765. break
  766. }
  767. }
  768. subscribersMutexes[i].Unlock()
  769. }
  770. subscribersTimeMutexes[i].Unlock()
  771. },
  772. })
  773. }
  774. // 27
  775. if topic == masterConfig.TopicOfRoiPoints && len(masterConfig.RuleOfRoiPoints) > 0 {
  776. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  777. Node: commonConfig.RosNode,
  778. Topic: topic,
  779. Callback: func(data *sensor_msgs.PointCloud2) {
  780. subscribersTimeMutexes[i].Lock()
  781. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  782. subscribersMutexes[i].Lock()
  783. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  784. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  785. var faultLabel string
  786. for _, f := range masterConfig.RuleOfRoiPoints {
  787. faultLabel = f(data)
  788. if faultLabel != "" {
  789. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  790. subscribersTimes[i] = time.Now()
  791. break
  792. }
  793. }
  794. subscribersMutexes[i].Unlock()
  795. }
  796. subscribersTimeMutexes[i].Unlock()
  797. },
  798. })
  799. }
  800. // 28
  801. if topic == masterConfig.TopicOfRoiPolygon && len(masterConfig.RuleOfRoiPolygon) > 0 {
  802. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  803. Node: commonConfig.RosNode,
  804. Topic: topic,
  805. Callback: func(data *nav_msgs.Path) {
  806. subscribersTimeMutexes[i].Lock()
  807. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  808. subscribersMutexes[i].Lock()
  809. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  810. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  811. var faultLabel string
  812. for _, f := range masterConfig.RuleOfRoiPolygon {
  813. faultLabel = f(data)
  814. if faultLabel != "" {
  815. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  816. subscribersTimes[i] = time.Now()
  817. break
  818. }
  819. }
  820. subscribersMutexes[i].Unlock()
  821. }
  822. subscribersTimeMutexes[i].Unlock()
  823. },
  824. })
  825. }
  826. // 29
  827. if topic == masterConfig.TopicOfTf && len(masterConfig.RuleOfTf) > 0 {
  828. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  829. Node: commonConfig.RosNode,
  830. Topic: topic,
  831. Callback: func(data *tf2_msgs.TFMessage) {
  832. subscribersTimeMutexes[i].Lock()
  833. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  834. subscribersMutexes[i].Lock()
  835. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  836. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  837. var faultLabel string
  838. for _, f := range masterConfig.RuleOfTf {
  839. faultLabel = f(data)
  840. if faultLabel != "" {
  841. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  842. subscribersTimes[i] = time.Now()
  843. break
  844. }
  845. }
  846. subscribersMutexes[i].Unlock()
  847. }
  848. subscribersTimeMutexes[i].Unlock()
  849. },
  850. })
  851. }
  852. // 30
  853. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  854. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  855. Node: commonConfig.RosNode,
  856. Topic: topic,
  857. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  858. // 更新共享变量
  859. mutexOfTpperception.RLock()
  860. {
  861. for _, obj := range data.Objs {
  862. if obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  863. continue
  864. }
  865. // 检查 ObjDicOfTpperception 是否为 nil,如果是,则初始化它
  866. if pjisuvParam.ObjDicOfTpperception == nil {
  867. pjisuvParam.ObjDicOfTpperception = make(map[uint32][]float32)
  868. }
  869. if _, ok := pjisuvParam.ObjDicOfTpperception[obj.Id]; !ok {
  870. pjisuvParam.ObjDicOfTpperception[obj.Id] = []float32{}
  871. }
  872. pjisuvParam.ObjDicOfTpperception[obj.Id] = append(pjisuvParam.ObjDicOfTpperception[obj.Id], obj.Y)
  873. }
  874. }
  875. mutexOfTpperception.RUnlock()
  876. subscribersTimeMutexes[i].Lock()
  877. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  878. subscribersMutexes[i].Lock()
  879. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  880. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  881. var faultLabel string
  882. for _, f := range masterConfig.RuleOfTpperception {
  883. faultLabel = f(data, pjisuvParam)
  884. if faultLabel != "" {
  885. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  886. break
  887. }
  888. }
  889. subscribersMutexes[i].Unlock()
  890. }
  891. subscribersTimeMutexes[i].Unlock()
  892. // -------- 触发后更新共享变量
  893. mutexOfTpperception.RLock()
  894. {
  895. for _, obj := range data.Objs {
  896. pjisuvParam.ObjTypeDicOfTpperception[obj.Id] = obj.Type
  897. pjisuvParam.ObjSpeedDicOfTpperception[obj.Id] = math.Pow(math.Pow(float64(obj.Vxabs), 2)+math.Pow(float64(obj.Vyabs), 2), 0.5)
  898. }
  899. }
  900. mutexOfTpperception.RUnlock()
  901. },
  902. })
  903. }
  904. // 31
  905. if topic == masterConfig.TopicOfTpperceptionVis && len(masterConfig.RuleOfTpperceptionVis) > 0 {
  906. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  907. Node: commonConfig.RosNode,
  908. Topic: topic,
  909. Callback: func(data *visualization_msgs.MarkerArray) {
  910. subscribersTimeMutexes[i].Lock()
  911. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  912. subscribersMutexes[i].Lock()
  913. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  914. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  915. var faultLabel string
  916. for _, f := range masterConfig.RuleOfTpperceptionVis {
  917. faultLabel = f(data)
  918. if faultLabel != "" {
  919. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  920. subscribersTimes[i] = time.Now()
  921. break
  922. }
  923. }
  924. subscribersMutexes[i].Unlock()
  925. }
  926. subscribersTimeMutexes[i].Unlock()
  927. },
  928. })
  929. }
  930. // 32
  931. if topic == masterConfig.TopicOfTprouteplan && len(masterConfig.RuleOfTprouteplan) > 0 {
  932. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  933. Node: commonConfig.RosNode,
  934. Topic: topic,
  935. Callback: func(data *pjisuv_msgs.RoutePlan) {
  936. subscribersTimeMutexes[i].Lock()
  937. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  938. subscribersMutexes[i].Lock()
  939. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  940. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  941. var faultLabel string
  942. for _, f := range masterConfig.RuleOfTprouteplan {
  943. faultLabel = f(data)
  944. if faultLabel != "" {
  945. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  946. subscribersTimes[i] = time.Now()
  947. break
  948. }
  949. }
  950. subscribersMutexes[i].Unlock()
  951. }
  952. subscribersTimeMutexes[i].Unlock()
  953. },
  954. })
  955. }
  956. // 33
  957. if topic == masterConfig.TopicOfTrajectoryDisplay && len(masterConfig.RuleOfTrajectoryDisplay) > 0 {
  958. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  959. Node: commonConfig.RosNode,
  960. Topic: topic,
  961. Callback: func(data *nav_msgs.Path) {
  962. subscribersTimeMutexes[i].Lock()
  963. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  964. subscribersMutexes[i].Lock()
  965. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  966. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  967. var faultLabel string
  968. for _, f := range masterConfig.RuleOfTrajectoryDisplay {
  969. faultLabel = f(data)
  970. if faultLabel != "" {
  971. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  972. subscribersTimes[i] = time.Now()
  973. break
  974. }
  975. }
  976. subscribersMutexes[i].Unlock()
  977. }
  978. subscribersTimeMutexes[i].Unlock()
  979. },
  980. })
  981. }
  982. // 34
  983. if topic == masterConfig.TopicOfUngroundCloudpoints && len(masterConfig.RuleOfUngroundCloudpoints) > 0 {
  984. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  985. Node: commonConfig.RosNode,
  986. Topic: topic,
  987. Callback: func(data *sensor_msgs.PointCloud2) {
  988. subscribersTimeMutexes[i].Lock()
  989. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  990. subscribersMutexes[i].Lock()
  991. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  992. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  993. var faultLabel string
  994. for _, f := range masterConfig.RuleOfUngroundCloudpoints {
  995. faultLabel = f(data)
  996. if faultLabel != "" {
  997. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  998. subscribersTimes[i] = time.Now()
  999. break
  1000. }
  1001. }
  1002. subscribersMutexes[i].Unlock()
  1003. }
  1004. subscribersTimeMutexes[i].Unlock()
  1005. },
  1006. })
  1007. }
  1008. // 35
  1009. if topic == masterConfig.TopicOfCameraImage && len(masterConfig.RuleOfCameraImage) > 0 {
  1010. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1011. Node: commonConfig.RosNode,
  1012. Topic: topic,
  1013. Callback: func(data *sensor_msgs.Image) {
  1014. subscribersTimeMutexes[i].Lock()
  1015. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1016. subscribersMutexes[i].Lock()
  1017. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1018. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1019. var faultLabel string
  1020. for _, f := range masterConfig.RuleOfCameraImage {
  1021. faultLabel = f(data)
  1022. if faultLabel != "" {
  1023. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1024. subscribersTimes[i] = time.Now()
  1025. break
  1026. }
  1027. }
  1028. subscribersMutexes[i].Unlock()
  1029. }
  1030. subscribersTimeMutexes[i].Unlock()
  1031. },
  1032. })
  1033. }
  1034. // 36
  1035. if topic == masterConfig.TopicOfDataRead && len(masterConfig.RuleOfDataRead) > 0 {
  1036. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1037. Node: commonConfig.RosNode,
  1038. Topic: topic,
  1039. Callback: func(data *pjisuv_msgs.Retrieval) {
  1040. // 更新共享变量
  1041. mutexOfDataRead.RLock()
  1042. {
  1043. pjisuvParam.NumCountDataReadOfDataRead++
  1044. if pjisuvParam.NumCountDataReadOfDataRead == 10 {
  1045. pjisuvParam.EgoSteeringRealOfDataRead = append(pjisuvParam.EgoSteeringRealOfDataRead, data.ActStrWhAng)
  1046. pjisuvParam.EgoThrottleRealOfDataRead = append(pjisuvParam.EgoThrottleRealOfDataRead, data.AccPed2)
  1047. pjisuvParam.NumCountDataReadOfDataRead = 0
  1048. }
  1049. pjisuvParam.StrgAngleRealValueOfDataRead = data.ActStrWhAng
  1050. }
  1051. mutexOfDataRead.RUnlock()
  1052. subscribersTimeMutexes[i].Lock()
  1053. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1054. subscribersMutexes[i].Lock()
  1055. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1056. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1057. var faultLabel string
  1058. for _, f := range masterConfig.RuleOfDataRead {
  1059. faultLabel = f(data)
  1060. if faultLabel != "" {
  1061. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1062. subscribersTimes[i] = time.Now()
  1063. break
  1064. }
  1065. }
  1066. subscribersMutexes[i].Unlock()
  1067. }
  1068. subscribersTimeMutexes[i].Unlock()
  1069. },
  1070. })
  1071. }
  1072. // 37
  1073. if topic == masterConfig.TopicOfPjiGps && len(masterConfig.RuleOfPjiGps) > 0 {
  1074. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1075. Node: commonConfig.RosNode,
  1076. Topic: topic,
  1077. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  1078. subscribersTimeMutexes[i].Lock()
  1079. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1080. subscribersMutexes[i].Lock()
  1081. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1082. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1083. var faultLabel string
  1084. for _, f := range masterConfig.RuleOfPjiGps {
  1085. faultLabel = f(data)
  1086. if faultLabel != "" {
  1087. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1088. subscribersTimes[i] = time.Now()
  1089. break
  1090. }
  1091. }
  1092. subscribersMutexes[i].Unlock()
  1093. }
  1094. subscribersTimeMutexes[i].Unlock()
  1095. },
  1096. })
  1097. }
  1098. // 39
  1099. if topic == masterConfig.TopicOfPjVehicleFdbPub && len(masterConfig.RuleOfPjVehicleFdbPub) > 0 {
  1100. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1101. Node: commonConfig.RosNode,
  1102. Topic: topic,
  1103. Callback: func(data *pjisuv_msgs.VehicleFdb) {
  1104. subscribersTimeMutexes[i].Lock()
  1105. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1106. subscribersMutexes[i].Lock()
  1107. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1108. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1109. var faultLabel string
  1110. for _, f := range masterConfig.RuleOfPjVehicleFdbPub {
  1111. faultLabel = f(data, &pjisuvParam)
  1112. if faultLabel != "" {
  1113. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1114. subscribersTimes[i] = time.Now()
  1115. break
  1116. }
  1117. }
  1118. subscribersMutexes[i].Unlock()
  1119. }
  1120. subscribersTimeMutexes[i].Unlock()
  1121. },
  1122. })
  1123. }
  1124. if err != nil {
  1125. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  1126. //TODO 如何回传日志
  1127. continue
  1128. }
  1129. }
  1130. select {
  1131. case signal := <-service.ChannelKillWindowProducer:
  1132. if signal == 1 {
  1133. commonConfig.RosNode.Close()
  1134. service.AddKillTimes("3")
  1135. return
  1136. }
  1137. }
  1138. }
  1139. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  1140. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1141. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) { // 如果是不在旧故障窗口内,添加一个新窗口
  1142. newTimeWindow := commonEntity.TimeWindow{
  1143. FaultTime: faultHappenTime,
  1144. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  1145. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  1146. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  1147. Labels: []string{faultLabel},
  1148. MasterTopics: masterTopics,
  1149. SlaveTopics: slaveTopics,
  1150. }
  1151. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1152. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1153. } else { // 如果在旧故障窗口内
  1154. commonEntity.TimeWindowProducerQueueMutex.RLock()
  1155. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  1156. // 更新故障窗口end时间
  1157. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  1158. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1159. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  1160. lastTimeWindow.TimeWindowEnd = maxEnd
  1161. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  1162. } else {
  1163. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  1164. lastTimeWindow.TimeWindowEnd = expectEnd
  1165. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1166. }
  1167. }
  1168. // 更新label
  1169. labels := lastTimeWindow.Labels
  1170. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1171. // 更新 topic
  1172. sourceMasterTopics := lastTimeWindow.MasterTopics
  1173. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1174. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1175. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1176. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1177. }
  1178. }
  1179. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1180. // 获取所有需要采集的topic
  1181. var faultCodeTopics []string
  1182. for _, code := range commonConfig.CloudConfig.Triggers {
  1183. if code.Label == faultLabel {
  1184. faultCodeTopics = code.Topics
  1185. }
  1186. }
  1187. // 根据不同节点采集的topic进行分配采集
  1188. for _, acceptTopic := range faultCodeTopics {
  1189. for _, host := range commonConfig.CloudConfig.Hosts {
  1190. for _, topic := range host.Topics {
  1191. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1192. masterTopics = append(masterTopics, acceptTopic)
  1193. }
  1194. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1195. slaveTopics = append(slaveTopics, acceptTopic)
  1196. }
  1197. }
  1198. }
  1199. }
  1200. return masterTopics, slaveTopics
  1201. }