produce_window.go 44 KB


  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "github.com/bluenviron/goroslib/v2"
  11. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  16. "math"
  17. "sync"
  18. "time"
  19. )
  20. // 所有共享变量
  21. var (
  22. pjisuvParam commonEntity.PjisuvParam
  23. // /cicv_location
  24. mutexOfCicvLocation sync.RWMutex
  25. // /tpperception
  26. mutexOfTpperception sync.RWMutex
  27. // /pj_control_pub
  28. mutexOfPjControlPub sync.RWMutex
  29. // /data_read
  30. mutexOfDataRead sync.RWMutex
  31. // /pj_vehicle_fdb_pub
  32. mutexOfPjVehicleFdbPub sync.RWMutex
  33. )
  34. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  35. func PrepareTimeWindowProducerQueue() {
  36. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  37. var err error
  38. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  39. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  40. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  41. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  42. for i, topic := range commonConfig.SubscribeTopics {
  43. // !!!扩展的定时任务监听,牛逼的设计!!!扩展性拉满啦
  44. if topic == masterConfig.TopicOfCicvExtend {
  45. go func() {
  46. for {
  47. time.Sleep(time.Duration(3500) * time.Millisecond)
  48. for _, f := range masterConfig.RuleOfCicvExtend {
  49. label := f(pjisuvParam)
  50. if label != "" {
  51. saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
  52. break
  53. }
  54. }
  55. }
  56. }()
  57. }
  58. // 其他常规监听器
  59. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  60. // 1
  61. if topic == masterConfig.TopicOfAmrPose && len(masterConfig.RuleOfAmrPose) > 0 {
  62. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  63. Node: commonConfig.RosNode,
  64. Topic: topic,
  65. Callback: func(data *visualization_msgs.MarkerArray) {
  66. subscribersTimeMutexes[i].Lock()
  67. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  68. subscribersMutexes[i].Lock()
  69. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  70. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  71. var faultLabel string
  72. for _, f := range masterConfig.RuleOfAmrPose {
  73. faultLabel = f(data)
  74. if faultLabel != "" {
  75. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  76. subscribersTimes[i] = time.Now()
  77. break
  78. }
  79. }
  80. subscribersMutexes[i].Unlock()
  81. }
  82. subscribersTimeMutexes[i].Unlock()
  83. },
  84. })
  85. }
  86. // 2
  87. if topic == masterConfig.TopicOfBoundingBoxesFast && len(masterConfig.RuleOfBoundingBoxesFast) > 0 {
  88. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  89. Node: commonConfig.RosNode,
  90. Topic: topic,
  91. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  92. subscribersTimeMutexes[i].Lock()
  93. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  94. subscribersMutexes[i].Lock()
  95. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  96. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  97. var faultLabel string
  98. for _, f := range masterConfig.RuleOfBoundingBoxesFast {
  99. faultLabel = f(data)
  100. if faultLabel != "" {
  101. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  102. subscribersTimes[i] = time.Now()
  103. break
  104. }
  105. }
  106. subscribersMutexes[i].Unlock()
  107. }
  108. subscribersTimeMutexes[i].Unlock()
  109. },
  110. })
  111. }
  112. // 3
  113. if topic == masterConfig.TopicOfCameraFault && len(masterConfig.RuleOfCameraFault) > 0 {
  114. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  115. Node: commonConfig.RosNode,
  116. Topic: topic,
  117. Callback: func(data *pjisuv_msgs.FaultVec) {
  118. subscribersTimeMutexes[i].Lock()
  119. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  120. subscribersMutexes[i].Lock()
  121. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  122. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  123. var faultLabel string
  124. for _, f := range masterConfig.RuleOfCameraFault {
  125. faultLabel = f(data)
  126. if faultLabel != "" {
  127. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  128. subscribersTimes[i] = time.Now()
  129. break
  130. }
  131. }
  132. subscribersMutexes[i].Unlock()
  133. }
  134. subscribersTimeMutexes[i].Unlock()
  135. },
  136. })
  137. }
  138. // 4
  139. if topic == masterConfig.TopicOfCanData && len(masterConfig.RuleOfCanData) > 0 {
  140. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  141. Node: commonConfig.RosNode,
  142. Topic: topic,
  143. Callback: func(data *pjisuv_msgs.Frame) {
  144. subscribersTimeMutexes[i].Lock()
  145. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  146. subscribersMutexes[i].Lock()
  147. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  148. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  149. var faultLabel string
  150. for _, f := range masterConfig.RuleOfCanData {
  151. faultLabel = f(data)
  152. if faultLabel != "" {
  153. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  154. subscribersTimes[i] = time.Now()
  155. break
  156. }
  157. }
  158. subscribersMutexes[i].Unlock()
  159. }
  160. subscribersTimeMutexes[i].Unlock()
  161. },
  162. })
  163. }
  164. // 5
  165. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud && len(masterConfig.RuleOfCh128x1LslidarPointCloud) > 0 {
  166. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  167. Node: commonConfig.RosNode,
  168. Topic: topic,
  169. Callback: func(data *sensor_msgs.PointCloud2) {
  170. subscribersTimeMutexes[i].Lock()
  171. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  172. subscribersMutexes[i].Lock()
  173. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  174. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  175. var faultLabel string
  176. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud {
  177. faultLabel = f(data)
  178. if faultLabel != "" {
  179. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  180. subscribersTimes[i] = time.Now()
  181. break
  182. }
  183. }
  184. subscribersMutexes[i].Unlock()
  185. }
  186. subscribersTimeMutexes[i].Unlock()
  187. },
  188. })
  189. }
  190. // 6
  191. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud && len(masterConfig.RuleOfCh64wLLslidarPointCloud) > 0 {
  192. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  193. Node: commonConfig.RosNode,
  194. Topic: topic,
  195. Callback: func(data *sensor_msgs.PointCloud2) {
  196. subscribersTimeMutexes[i].Lock()
  197. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  198. subscribersMutexes[i].Lock()
  199. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  200. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  201. var faultLabel string
  202. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud {
  203. faultLabel = f(data)
  204. if faultLabel != "" {
  205. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  206. subscribersTimes[i] = time.Now()
  207. break
  208. }
  209. }
  210. subscribersMutexes[i].Unlock()
  211. }
  212. subscribersTimeMutexes[i].Unlock()
  213. },
  214. })
  215. }
  216. // 7
  217. if topic == masterConfig.TopicOfCh64wLScan && len(masterConfig.RuleOfCh64wLScan) > 0 {
  218. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  219. Node: commonConfig.RosNode,
  220. Topic: topic,
  221. Callback: func(data *sensor_msgs.LaserScan) {
  222. subscribersTimeMutexes[i].Lock()
  223. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  224. subscribersMutexes[i].Lock()
  225. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  226. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  227. var faultLabel string
  228. for _, f := range masterConfig.RuleOfCh64wLScan {
  229. faultLabel = f(data)
  230. if faultLabel != "" {
  231. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  232. subscribersTimes[i] = time.Now()
  233. break
  234. }
  235. }
  236. subscribersMutexes[i].Unlock()
  237. }
  238. subscribersTimeMutexes[i].Unlock()
  239. },
  240. })
  241. }
  242. // 8
  243. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud && len(masterConfig.RuleOfCh64wRLslidarPointCloud) > 0 {
  244. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  245. Node: commonConfig.RosNode,
  246. Topic: topic,
  247. Callback: func(data *sensor_msgs.PointCloud2) {
  248. subscribersTimeMutexes[i].Lock()
  249. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  250. subscribersMutexes[i].Lock()
  251. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  252. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  253. var faultLabel string
  254. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud {
  255. faultLabel = f(data)
  256. if faultLabel != "" {
  257. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  258. subscribersTimes[i] = time.Now()
  259. break
  260. }
  261. }
  262. subscribersMutexes[i].Unlock()
  263. }
  264. subscribersTimeMutexes[i].Unlock()
  265. },
  266. })
  267. }
  268. // 9
  269. if topic == masterConfig.TopicOfCh64wRScan && len(masterConfig.RuleOfCh64wRScan) > 0 {
  270. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  271. Node: commonConfig.RosNode,
  272. Topic: topic,
  273. Callback: func(data *sensor_msgs.LaserScan) {
  274. subscribersTimeMutexes[i].Lock()
  275. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  276. subscribersMutexes[i].Lock()
  277. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  278. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  279. var faultLabel string
  280. for _, f := range masterConfig.RuleOfCh64wRScan {
  281. faultLabel = f(data)
  282. if faultLabel != "" {
  283. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  284. subscribersTimes[i] = time.Now()
  285. break
  286. }
  287. }
  288. subscribersMutexes[i].Unlock()
  289. }
  290. subscribersTimeMutexes[i].Unlock()
  291. },
  292. })
  293. }
  294. // 10
  295. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects && len(masterConfig.RuleOfCicvLidarclusterMovingObjects) > 0 {
  296. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  297. Node: commonConfig.RosNode,
  298. Topic: topic,
  299. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  300. subscribersTimeMutexes[i].Lock()
  301. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  302. subscribersMutexes[i].Lock()
  303. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  304. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  305. var faultLabel string
  306. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects {
  307. faultLabel = f(data)
  308. if faultLabel != "" {
  309. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  310. subscribersTimes[i] = time.Now()
  311. break
  312. }
  313. }
  314. subscribersMutexes[i].Unlock()
  315. }
  316. subscribersTimeMutexes[i].Unlock()
  317. },
  318. })
  319. }
  320. // 11
  321. if topic == masterConfig.TopicOfCicvAmrTrajectory && len(masterConfig.RuleOfCicvAmrTrajectory) > 0 {
  322. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  323. Node: commonConfig.RosNode,
  324. Topic: topic,
  325. Callback: func(data *pjisuv_msgs.Trajectory) {
  326. subscribersTimeMutexes[i].Lock()
  327. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  328. subscribersMutexes[i].Lock()
  329. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  330. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  331. var faultLabel string
  332. for _, f := range masterConfig.RuleOfCicvAmrTrajectory {
  333. faultLabel = f(data)
  334. if faultLabel != "" {
  335. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  336. subscribersTimes[i] = time.Now()
  337. break
  338. }
  339. }
  340. subscribersMutexes[i].Unlock()
  341. }
  342. subscribersTimeMutexes[i].Unlock()
  343. },
  344. })
  345. }
  346. // 12
  347. if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
  348. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  349. Node: commonConfig.RosNode,
  350. Topic: topic,
  351. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  352. // 更新共享变量
  353. mutexOfCicvLocation.RLock()
  354. {
  355. pjisuvParam.VelocityYOfCicvLocation = data.VelocityX
  356. pjisuvParam.VelocityYOfCicvLocation = data.VelocityY
  357. pjisuvParam.VelocityYOfCicvLocation = data.VelocityZ
  358. pjisuvParam.YawOfCicvLocation = data.Yaw
  359. pjisuvParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
  360. pjisuvParam.PositionXOfCicvLocation = data.PositionX
  361. pjisuvParam.PositionYOfCicvLocation = data.PositionY
  362. }
  363. mutexOfCicvLocation.RUnlock()
  364. subscribersTimeMutexes[i].Lock()
  365. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  366. subscribersMutexes[i].Lock()
  367. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  368. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  369. var faultLabel string
  370. for _, f := range masterConfig.RuleOfCicvLocation {
  371. faultLabel = f(data)
  372. if faultLabel != "" {
  373. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  374. break
  375. }
  376. }
  377. subscribersMutexes[i].Unlock()
  378. }
  379. subscribersTimeMutexes[i].Unlock()
  380. },
  381. })
  382. }
  383. // 13
  384. if topic == masterConfig.TopicOfCloudClusters && len(masterConfig.RuleOfCloudClusters) > 0 {
  385. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  386. Node: commonConfig.RosNode,
  387. Topic: topic,
  388. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  389. subscribersTimeMutexes[i].Lock()
  390. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  391. subscribersMutexes[i].Lock()
  392. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  393. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  394. var faultLabel string
  395. for _, f := range masterConfig.RuleOfCloudClusters {
  396. faultLabel = f(data)
  397. if faultLabel != "" {
  398. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  399. subscribersTimes[i] = time.Now()
  400. break
  401. }
  402. }
  403. subscribersMutexes[i].Unlock()
  404. }
  405. subscribersTimeMutexes[i].Unlock()
  406. },
  407. })
  408. }
  409. // 14
  410. if topic == masterConfig.TopicOfHeartbeatInfo && len(masterConfig.RuleOfHeartbeatInfo) > 0 {
  411. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  412. Node: commonConfig.RosNode,
  413. Topic: topic,
  414. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  415. subscribersTimeMutexes[i].Lock()
  416. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  417. subscribersMutexes[i].Lock()
  418. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  419. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  420. var faultLabel string
  421. for _, f := range masterConfig.RuleOfHeartbeatInfo {
  422. faultLabel = f(data)
  423. if faultLabel != "" {
  424. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  425. subscribersTimes[i] = time.Now()
  426. break
  427. }
  428. }
  429. subscribersMutexes[i].Unlock()
  430. }
  431. subscribersTimeMutexes[i].Unlock()
  432. },
  433. })
  434. }
  435. // 15
  436. if topic == masterConfig.TopicOfLidarPretreatmentCost && len(masterConfig.RuleOfLidarPretreatmentCost) > 0 {
  437. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  438. Node: commonConfig.RosNode,
  439. Topic: topic,
  440. Callback: func(data *geometry_msgs.Vector3Stamped) {
  441. subscribersTimeMutexes[i].Lock()
  442. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  443. subscribersMutexes[i].Lock()
  444. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  445. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  446. var faultLabel string
  447. for _, f := range masterConfig.RuleOfLidarPretreatmentCost {
  448. faultLabel = f(data)
  449. if faultLabel != "" {
  450. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  451. subscribersTimes[i] = time.Now()
  452. break
  453. }
  454. }
  455. subscribersMutexes[i].Unlock()
  456. }
  457. subscribersTimeMutexes[i].Unlock()
  458. },
  459. })
  460. }
  461. // 16
  462. if topic == masterConfig.TopicOfLidarPretreatmentOdometry && len(masterConfig.RuleOfLidarPretreatmentOdometry) > 0 {
  463. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  464. Node: commonConfig.RosNode,
  465. Topic: topic,
  466. Callback: func(data *nav_msgs.Odometry) {
  467. subscribersTimeMutexes[i].Lock()
  468. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  469. subscribersMutexes[i].Lock()
  470. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  471. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  472. var faultLabel string
  473. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry {
  474. faultLabel = f(data)
  475. if faultLabel != "" {
  476. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  477. subscribersTimes[i] = time.Now()
  478. break
  479. }
  480. }
  481. subscribersMutexes[i].Unlock()
  482. }
  483. subscribersTimeMutexes[i].Unlock()
  484. },
  485. })
  486. }
  487. // 17
  488. if topic == masterConfig.TopicOfLidarRoi && len(masterConfig.RuleOfLidarRoi) > 0 {
  489. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  490. Node: commonConfig.RosNode,
  491. Topic: topic,
  492. Callback: func(data *geometry_msgs.PolygonStamped) {
  493. subscribersTimeMutexes[i].Lock()
  494. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  495. subscribersMutexes[i].Lock()
  496. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  497. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  498. var faultLabel string
  499. for _, f := range masterConfig.RuleOfLidarRoi {
  500. faultLabel = f(data)
  501. if faultLabel != "" {
  502. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  503. subscribersTimes[i] = time.Now()
  504. break
  505. }
  506. }
  507. subscribersMutexes[i].Unlock()
  508. }
  509. subscribersTimeMutexes[i].Unlock()
  510. },
  511. })
  512. }
  513. // 18
  514. if topic == masterConfig.TopicOfLine1 && len(masterConfig.RuleOfLine1) > 0 {
  515. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  516. Node: commonConfig.RosNode,
  517. Topic: topic,
  518. Callback: func(data *nav_msgs.Path) {
  519. subscribersTimeMutexes[i].Lock()
  520. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  521. subscribersMutexes[i].Lock()
  522. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  523. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  524. var faultLabel string
  525. for _, f := range masterConfig.RuleOfLine1 {
  526. faultLabel = f(data)
  527. if faultLabel != "" {
  528. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  529. subscribersTimes[i] = time.Now()
  530. break
  531. }
  532. }
  533. subscribersMutexes[i].Unlock()
  534. }
  535. subscribersTimeMutexes[i].Unlock()
  536. },
  537. })
  538. }
  539. // 19
  540. if topic == masterConfig.TopicOfLine2 && len(masterConfig.RuleOfLine2) > 0 {
  541. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  542. Node: commonConfig.RosNode,
  543. Topic: topic,
  544. Callback: func(data *nav_msgs.Path) {
  545. subscribersTimeMutexes[i].Lock()
  546. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  547. subscribersMutexes[i].Lock()
  548. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  549. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  550. var faultLabel string
  551. for _, f := range masterConfig.RuleOfLine2 {
  552. faultLabel = f(data)
  553. if faultLabel != "" {
  554. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  555. subscribersTimes[i] = time.Now()
  556. break
  557. }
  558. }
  559. subscribersMutexes[i].Unlock()
  560. }
  561. subscribersTimeMutexes[i].Unlock()
  562. },
  563. })
  564. }
  565. // 20
  566. if topic == masterConfig.TopicOfMapPolygon && len(masterConfig.RuleOfMapPolygon) > 0 {
  567. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  568. Node: commonConfig.RosNode,
  569. Topic: topic,
  570. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  571. subscribersTimeMutexes[i].Lock()
  572. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  573. subscribersMutexes[i].Lock()
  574. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  575. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  576. var faultLabel string
  577. for _, f := range masterConfig.RuleOfMapPolygon {
  578. faultLabel = f(data)
  579. if faultLabel != "" {
  580. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  581. subscribersTimes[i] = time.Now()
  582. break
  583. }
  584. }
  585. subscribersMutexes[i].Unlock()
  586. }
  587. subscribersTimeMutexes[i].Unlock()
  588. },
  589. })
  590. }
  591. // 21
  592. if topic == masterConfig.TopicOfObstacleDisplay && len(masterConfig.RuleOfObstacleDisplay) > 0 {
  593. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  594. Node: commonConfig.RosNode,
  595. Topic: topic,
  596. Callback: func(data *visualization_msgs.MarkerArray) {
  597. subscribersTimeMutexes[i].Lock()
  598. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  599. subscribersMutexes[i].Lock()
  600. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  601. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  602. var faultLabel string
  603. for _, f := range masterConfig.RuleOfObstacleDisplay {
  604. faultLabel = f(data)
  605. if faultLabel != "" {
  606. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  607. subscribersTimes[i] = time.Now()
  608. break
  609. }
  610. }
  611. subscribersMutexes[i].Unlock()
  612. }
  613. subscribersTimeMutexes[i].Unlock()
  614. },
  615. })
  616. }
  617. // 22
  618. if topic == masterConfig.TopicOfPjControlPub && len(masterConfig.RuleOfPjControlPub) > 0 {
  619. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  620. Node: commonConfig.RosNode,
  621. Topic: topic,
  622. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  623. // 更新共享变量
  624. mutexOfPjControlPub.RLock()
  625. {
  626. pjisuvParam.NumCountPjiControlCommandOfPjControlPub++
  627. if pjisuvParam.NumCountPjiControlCommandOfPjControlPub == 10 {
  628. pjisuvParam.EgoSteeringCmdOfPjControlPub = append(pjisuvParam.EgoSteeringCmdOfPjControlPub, data.ICPVCmdStrAngle)
  629. pjisuvParam.EgoThrottleCmdOfPjControlPub = append(pjisuvParam.EgoThrottleCmdOfPjControlPub, data.ICPVCmdAccPelPosAct)
  630. pjisuvParam.NumCountPjiControlCommandOfPjControlPub = 0
  631. }
  632. }
  633. mutexOfPjControlPub.RUnlock()
  634. subscribersTimeMutexes[i].Lock()
  635. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  636. subscribersMutexes[i].Lock()
  637. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  638. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  639. var faultLabel string
  640. for _, f := range masterConfig.RuleOfPjControlPub {
  641. faultLabel = f(data)
  642. if faultLabel != "" {
  643. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  644. subscribersTimes[i] = time.Now()
  645. break
  646. }
  647. }
  648. subscribersMutexes[i].Unlock()
  649. }
  650. subscribersTimeMutexes[i].Unlock()
  651. },
  652. })
  653. }
  654. // 23
  655. if topic == masterConfig.TopicOfPointsCluster && len(masterConfig.RuleOfPointsCluster) > 0 {
  656. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  657. Node: commonConfig.RosNode,
  658. Topic: topic,
  659. Callback: func(data *sensor_msgs.PointCloud2) {
  660. subscribersTimeMutexes[i].Lock()
  661. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  662. subscribersMutexes[i].Lock()
  663. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  664. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  665. var faultLabel string
  666. for _, f := range masterConfig.RuleOfPointsCluster {
  667. faultLabel = f(data)
  668. if faultLabel != "" {
  669. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  670. subscribersTimes[i] = time.Now()
  671. break
  672. }
  673. }
  674. subscribersMutexes[i].Unlock()
  675. }
  676. subscribersTimeMutexes[i].Unlock()
  677. },
  678. })
  679. }
  680. // 24
  681. if topic == masterConfig.TopicOfPointsConcat && len(masterConfig.RuleOfPointsConcat) > 0 {
  682. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  683. Node: commonConfig.RosNode,
  684. Topic: topic,
  685. Callback: func(data *sensor_msgs.PointCloud2) {
  686. subscribersTimeMutexes[i].Lock()
  687. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  688. subscribersMutexes[i].Lock()
  689. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  690. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  691. var faultLabel string
  692. for _, f := range masterConfig.RuleOfPointsConcat {
  693. faultLabel = f(data)
  694. if faultLabel != "" {
  695. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  696. subscribersTimes[i] = time.Now()
  697. break
  698. }
  699. }
  700. subscribersMutexes[i].Unlock()
  701. }
  702. subscribersTimeMutexes[i].Unlock()
  703. },
  704. })
  705. }
  706. // 25
  707. if topic == masterConfig.TopicOfReferenceDisplay && len(masterConfig.RuleOfReferenceDisplay) > 0 {
  708. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  709. Node: commonConfig.RosNode,
  710. Topic: topic,
  711. Callback: func(data *nav_msgs.Path) {
  712. subscribersTimeMutexes[i].Lock()
  713. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  714. subscribersMutexes[i].Lock()
  715. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  716. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  717. var faultLabel string
  718. for _, f := range masterConfig.RuleOfReferenceDisplay {
  719. faultLabel = f(data)
  720. if faultLabel != "" {
  721. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  722. subscribersTimes[i] = time.Now()
  723. break
  724. }
  725. }
  726. subscribersMutexes[i].Unlock()
  727. }
  728. subscribersTimeMutexes[i].Unlock()
  729. },
  730. })
  731. }
  732. // 26
  733. if topic == masterConfig.TopicOfReferenceTrajectory && len(masterConfig.RuleOfReferenceTrajectory) > 0 {
  734. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  735. Node: commonConfig.RosNode,
  736. Topic: topic,
  737. Callback: func(data *pjisuv_msgs.Trajectory) {
  738. subscribersTimeMutexes[i].Lock()
  739. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  740. subscribersMutexes[i].Lock()
  741. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  742. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  743. var faultLabel string
  744. for _, f := range masterConfig.RuleOfReferenceTrajectory {
  745. faultLabel = f(data)
  746. if faultLabel != "" {
  747. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  748. subscribersTimes[i] = time.Now()
  749. break
  750. }
  751. }
  752. subscribersMutexes[i].Unlock()
  753. }
  754. subscribersTimeMutexes[i].Unlock()
  755. },
  756. })
  757. }
  758. // 27
  759. if topic == masterConfig.TopicOfRoiPoints && len(masterConfig.RuleOfRoiPoints) > 0 {
  760. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  761. Node: commonConfig.RosNode,
  762. Topic: topic,
  763. Callback: func(data *sensor_msgs.PointCloud2) {
  764. subscribersTimeMutexes[i].Lock()
  765. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  766. subscribersMutexes[i].Lock()
  767. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  768. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  769. var faultLabel string
  770. for _, f := range masterConfig.RuleOfRoiPoints {
  771. faultLabel = f(data)
  772. if faultLabel != "" {
  773. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  774. subscribersTimes[i] = time.Now()
  775. break
  776. }
  777. }
  778. subscribersMutexes[i].Unlock()
  779. }
  780. subscribersTimeMutexes[i].Unlock()
  781. },
  782. })
  783. }
  784. // 28
  785. if topic == masterConfig.TopicOfRoiPolygon && len(masterConfig.RuleOfRoiPolygon) > 0 {
  786. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  787. Node: commonConfig.RosNode,
  788. Topic: topic,
  789. Callback: func(data *nav_msgs.Path) {
  790. subscribersTimeMutexes[i].Lock()
  791. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  792. subscribersMutexes[i].Lock()
  793. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  794. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  795. var faultLabel string
  796. for _, f := range masterConfig.RuleOfRoiPolygon {
  797. faultLabel = f(data)
  798. if faultLabel != "" {
  799. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  800. subscribersTimes[i] = time.Now()
  801. break
  802. }
  803. }
  804. subscribersMutexes[i].Unlock()
  805. }
  806. subscribersTimeMutexes[i].Unlock()
  807. },
  808. })
  809. }
  810. // 29
  811. if topic == masterConfig.TopicOfTf && len(masterConfig.RuleOfTf) > 0 {
  812. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  813. Node: commonConfig.RosNode,
  814. Topic: topic,
  815. Callback: func(data *tf2_msgs.TFMessage) {
  816. subscribersTimeMutexes[i].Lock()
  817. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  818. subscribersMutexes[i].Lock()
  819. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  820. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  821. var faultLabel string
  822. for _, f := range masterConfig.RuleOfTf {
  823. faultLabel = f(data)
  824. if faultLabel != "" {
  825. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  826. subscribersTimes[i] = time.Now()
  827. break
  828. }
  829. }
  830. subscribersMutexes[i].Unlock()
  831. }
  832. subscribersTimeMutexes[i].Unlock()
  833. },
  834. })
  835. }
  836. // 30
  837. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  838. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  839. Node: commonConfig.RosNode,
  840. Topic: topic,
  841. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  842. // 更新共享变量
  843. mutexOfTpperception.RLock()
  844. {
  845. for _, obj := range data.Objs {
  846. if obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  847. continue
  848. }
  849. if _, ok := pjisuvParam.ObjDicOfTpperception[obj.Id]; !ok {
  850. pjisuvParam.ObjDicOfTpperception[obj.Id] = []float32{}
  851. }
  852. pjisuvParam.ObjDicOfTpperception[obj.Id] = append(pjisuvParam.ObjDicOfTpperception[obj.Id], obj.Y)
  853. }
  854. }
  855. mutexOfTpperception.RUnlock()
  856. subscribersTimeMutexes[i].Lock()
  857. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  858. subscribersMutexes[i].Lock()
  859. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  860. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  861. var faultLabel string
  862. for _, f := range masterConfig.RuleOfTpperception {
  863. faultLabel = f(data, pjisuvParam)
  864. if faultLabel != "" {
  865. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  866. break
  867. }
  868. }
  869. subscribersMutexes[i].Unlock()
  870. }
  871. subscribersTimeMutexes[i].Unlock()
  872. },
  873. })
  874. }
  875. // 31
  876. if topic == masterConfig.TopicOfTpperceptionVis && len(masterConfig.RuleOfTpperceptionVis) > 0 {
  877. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  878. Node: commonConfig.RosNode,
  879. Topic: topic,
  880. Callback: func(data *visualization_msgs.MarkerArray) {
  881. subscribersTimeMutexes[i].Lock()
  882. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  883. subscribersMutexes[i].Lock()
  884. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  885. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  886. var faultLabel string
  887. for _, f := range masterConfig.RuleOfTpperceptionVis {
  888. faultLabel = f(data)
  889. if faultLabel != "" {
  890. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  891. subscribersTimes[i] = time.Now()
  892. break
  893. }
  894. }
  895. subscribersMutexes[i].Unlock()
  896. }
  897. subscribersTimeMutexes[i].Unlock()
  898. },
  899. })
  900. }
  901. // 32
  902. if topic == masterConfig.TopicOfTprouteplan && len(masterConfig.RuleOfTprouteplan) > 0 {
  903. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  904. Node: commonConfig.RosNode,
  905. Topic: topic,
  906. Callback: func(data *pjisuv_msgs.RoutePlan) {
  907. subscribersTimeMutexes[i].Lock()
  908. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  909. subscribersMutexes[i].Lock()
  910. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  911. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  912. var faultLabel string
  913. for _, f := range masterConfig.RuleOfTprouteplan {
  914. faultLabel = f(data)
  915. if faultLabel != "" {
  916. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  917. subscribersTimes[i] = time.Now()
  918. break
  919. }
  920. }
  921. subscribersMutexes[i].Unlock()
  922. }
  923. subscribersTimeMutexes[i].Unlock()
  924. },
  925. })
  926. }
  927. // 33
  928. if topic == masterConfig.TopicOfTrajectoryDisplay && len(masterConfig.RuleOfTrajectoryDisplay) > 0 {
  929. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  930. Node: commonConfig.RosNode,
  931. Topic: topic,
  932. Callback: func(data *nav_msgs.Path) {
  933. subscribersTimeMutexes[i].Lock()
  934. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  935. subscribersMutexes[i].Lock()
  936. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  937. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  938. var faultLabel string
  939. for _, f := range masterConfig.RuleOfTrajectoryDisplay {
  940. faultLabel = f(data)
  941. if faultLabel != "" {
  942. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  943. subscribersTimes[i] = time.Now()
  944. break
  945. }
  946. }
  947. subscribersMutexes[i].Unlock()
  948. }
  949. subscribersTimeMutexes[i].Unlock()
  950. },
  951. })
  952. }
  953. // 34
  954. if topic == masterConfig.TopicOfUngroundCloudpoints && len(masterConfig.RuleOfUngroundCloudpoints) > 0 {
  955. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  956. Node: commonConfig.RosNode,
  957. Topic: topic,
  958. Callback: func(data *sensor_msgs.PointCloud2) {
  959. subscribersTimeMutexes[i].Lock()
  960. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  961. subscribersMutexes[i].Lock()
  962. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  963. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  964. var faultLabel string
  965. for _, f := range masterConfig.RuleOfUngroundCloudpoints {
  966. faultLabel = f(data)
  967. if faultLabel != "" {
  968. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  969. subscribersTimes[i] = time.Now()
  970. break
  971. }
  972. }
  973. subscribersMutexes[i].Unlock()
  974. }
  975. subscribersTimeMutexes[i].Unlock()
  976. },
  977. })
  978. }
  979. // 35
  980. if topic == masterConfig.TopicOfCameraImage && len(masterConfig.RuleOfCameraImage) > 0 {
  981. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  982. Node: commonConfig.RosNode,
  983. Topic: topic,
  984. Callback: func(data *sensor_msgs.Image) {
  985. subscribersTimeMutexes[i].Lock()
  986. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  987. subscribersMutexes[i].Lock()
  988. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  989. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  990. var faultLabel string
  991. for _, f := range masterConfig.RuleOfCameraImage {
  992. faultLabel = f(data)
  993. if faultLabel != "" {
  994. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  995. subscribersTimes[i] = time.Now()
  996. break
  997. }
  998. }
  999. subscribersMutexes[i].Unlock()
  1000. }
  1001. subscribersTimeMutexes[i].Unlock()
  1002. },
  1003. })
  1004. }
  1005. // 36
  1006. if topic == masterConfig.TopicOfDataRead && len(masterConfig.RuleOfDataRead) > 0 {
  1007. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1008. Node: commonConfig.RosNode,
  1009. Topic: topic,
  1010. Callback: func(data *pjisuv_msgs.Retrieval) {
  1011. // 更新共享变量
  1012. mutexOfDataRead.RLock()
  1013. {
  1014. pjisuvParam.NumCountDataReadOfDataRead++
  1015. if pjisuvParam.NumCountDataReadOfDataRead == 10 {
  1016. pjisuvParam.EgoSteeringRealOfDataRead = append(pjisuvParam.EgoSteeringRealOfDataRead, data.ActStrWhAng)
  1017. pjisuvParam.EgoThrottleRealOfDataRead = append(pjisuvParam.EgoThrottleRealOfDataRead, data.AccPed2)
  1018. pjisuvParam.NumCountDataReadOfDataRead = 0
  1019. }
  1020. pjisuvParam.StrgAngleRealValueOfDataRead = data.ActStrWhAng
  1021. }
  1022. mutexOfDataRead.RUnlock()
  1023. subscribersTimeMutexes[i].Lock()
  1024. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1025. subscribersMutexes[i].Lock()
  1026. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1027. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1028. var faultLabel string
  1029. for _, f := range masterConfig.RuleOfDataRead {
  1030. faultLabel = f(data)
  1031. if faultLabel != "" {
  1032. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1033. subscribersTimes[i] = time.Now()
  1034. break
  1035. }
  1036. }
  1037. subscribersMutexes[i].Unlock()
  1038. }
  1039. subscribersTimeMutexes[i].Unlock()
  1040. },
  1041. })
  1042. }
  1043. // 37
  1044. if topic == masterConfig.TopicOfPjiGps && len(masterConfig.RuleOfPjiGps) > 0 {
  1045. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1046. Node: commonConfig.RosNode,
  1047. Topic: topic,
  1048. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  1049. subscribersTimeMutexes[i].Lock()
  1050. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1051. subscribersMutexes[i].Lock()
  1052. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1053. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1054. var faultLabel string
  1055. for _, f := range masterConfig.RuleOfPjiGps {
  1056. faultLabel = f(data)
  1057. if faultLabel != "" {
  1058. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1059. subscribersTimes[i] = time.Now()
  1060. break
  1061. }
  1062. }
  1063. subscribersMutexes[i].Unlock()
  1064. }
  1065. subscribersTimeMutexes[i].Unlock()
  1066. },
  1067. })
  1068. }
  1069. // 39
  1070. if topic == masterConfig.TopicOfPjVehicleFdbPub && len(masterConfig.RuleOfPjVehicleFdbPub) > 0 {
  1071. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1072. Node: commonConfig.RosNode,
  1073. Topic: topic,
  1074. Callback: func(data *pjisuv_msgs.VehicleFdb) {
  1075. // 更新共享变量
  1076. mutexOfPjControlPub.RLock()
  1077. {
  1078. pjisuvParam.AutomodeOfPjVehicleFdbPub = data.Automode
  1079. }
  1080. mutexOfPjControlPub.RUnlock()
  1081. subscribersTimeMutexes[i].Lock()
  1082. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1083. subscribersMutexes[i].Lock()
  1084. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1085. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1086. var faultLabel string
  1087. for _, f := range masterConfig.RuleOfPjVehicleFdbPub {
  1088. faultLabel = f(data, pjisuvParam)
  1089. if faultLabel != "" {
  1090. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1091. subscribersTimes[i] = time.Now()
  1092. break
  1093. }
  1094. }
  1095. subscribersMutexes[i].Unlock()
  1096. }
  1097. subscribersTimeMutexes[i].Unlock()
  1098. },
  1099. })
  1100. }
  1101. if err != nil {
  1102. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  1103. //TODO 如何回传日志
  1104. continue
  1105. }
  1106. }
  1107. select {
  1108. case signal := <-service.ChannelKillWindowProducer:
  1109. if signal == 1 {
  1110. commonConfig.RosNode.Close()
  1111. service.AddKillTimes("3")
  1112. return
  1113. }
  1114. }
  1115. }
  1116. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  1117. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1118. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  1119. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  1120. newTimeWindow := commonEntity.TimeWindow{
  1121. FaultTime: faultHappenTime,
  1122. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  1123. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  1124. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  1125. Labels: []string{faultLabel},
  1126. MasterTopics: masterTopics,
  1127. SlaveTopics: slaveTopics,
  1128. }
  1129. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1130. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1131. } else {
  1132. // 2-2 如果在旧故障窗口内
  1133. commonEntity.TimeWindowProducerQueueMutex.RLock()
  1134. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  1135. // 2-2-1 更新故障窗口end时间
  1136. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  1137. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1138. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  1139. lastTimeWindow.TimeWindowEnd = maxEnd
  1140. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  1141. } else {
  1142. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  1143. lastTimeWindow.TimeWindowEnd = expectEnd
  1144. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1145. }
  1146. }
  1147. // 2-2-2 更新label
  1148. labels := lastTimeWindow.Labels
  1149. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1150. // 2-2-3 更新 topic
  1151. sourceMasterTopics := lastTimeWindow.MasterTopics
  1152. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1153. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1154. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1155. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1156. }
  1157. }
  1158. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1159. // 获取所有需要采集的topic
  1160. var faultCodeTopics []string
  1161. for _, code := range commonConfig.CloudConfig.Triggers {
  1162. if code.Label == faultLabel {
  1163. faultCodeTopics = code.Topics
  1164. }
  1165. }
  1166. // 根据不同节点采集的topic进行分配采集
  1167. for _, acceptTopic := range faultCodeTopics {
  1168. for _, host := range commonConfig.CloudConfig.Hosts {
  1169. for _, topic := range host.Topics {
  1170. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1171. masterTopics = append(masterTopics, acceptTopic)
  1172. }
  1173. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1174. slaveTopics = append(slaveTopics, acceptTopic)
  1175. }
  1176. }
  1177. }
  1178. }
  1179. return masterTopics, slaveTopics
  1180. }