produce_window.go 43 KB


  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "github.com/bluenviron/goroslib/v2"
  11. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  16. "math"
  17. "sync"
  18. "time"
  19. )
  20. // 所有共享变量
  21. var (
  22. extendParam commonEntity.PjisuvParam
  23. // /cicv_location
  24. mutexOfCicvLocation sync.RWMutex
  25. // /tpperception
  26. mutexOfTpperception sync.RWMutex
  27. // /pj_control_pub
  28. mutexOfPjControlPub sync.RWMutex
  29. // /data_read
  30. mutexOfDataRead sync.RWMutex
  31. )
  32. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  33. func PrepareTimeWindowProducerQueue() {
  34. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  35. var err error
  36. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  37. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  38. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  39. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  40. for i, topic := range commonConfig.SubscribeTopics {
  41. // !!!扩展的定时任务监听,牛逼的设计!!!扩展性拉满啦
  42. if topic == masterConfig.TopicOfCicvExtend {
  43. go func() {
  44. for {
  45. time.Sleep(time.Duration(3500) * time.Millisecond)
  46. for _, f := range masterConfig.RuleOfCicvExtend {
  47. label := f(extendParam)
  48. if label != "" {
  49. saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
  50. break
  51. }
  52. }
  53. }
  54. }()
  55. }
  56. // 其他常规监听器
  57. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  58. // 1
  59. if topic == masterConfig.TopicOfAmrPose && len(masterConfig.RuleOfAmrPose) > 0 {
  60. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  61. Node: commonConfig.RosNode,
  62. Topic: topic,
  63. Callback: func(data *visualization_msgs.MarkerArray) {
  64. subscribersTimeMutexes[i].Lock()
  65. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  66. subscribersMutexes[i].Lock()
  67. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  68. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  69. var faultLabel string
  70. for _, f := range masterConfig.RuleOfAmrPose {
  71. faultLabel = f(data)
  72. if faultLabel != "" {
  73. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  74. subscribersTimes[i] = time.Now()
  75. break
  76. }
  77. }
  78. subscribersMutexes[i].Unlock()
  79. }
  80. subscribersTimeMutexes[i].Unlock()
  81. },
  82. })
  83. }
  84. // 2
  85. if topic == masterConfig.TopicOfBoundingBoxesFast && len(masterConfig.RuleOfBoundingBoxesFast) > 0 {
  86. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  87. Node: commonConfig.RosNode,
  88. Topic: topic,
  89. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  90. subscribersTimeMutexes[i].Lock()
  91. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  92. subscribersMutexes[i].Lock()
  93. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  94. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  95. var faultLabel string
  96. for _, f := range masterConfig.RuleOfBoundingBoxesFast {
  97. faultLabel = f(data)
  98. if faultLabel != "" {
  99. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  100. subscribersTimes[i] = time.Now()
  101. break
  102. }
  103. }
  104. subscribersMutexes[i].Unlock()
  105. }
  106. subscribersTimeMutexes[i].Unlock()
  107. },
  108. })
  109. }
  110. // 3
  111. if topic == masterConfig.TopicOfCameraFault && len(masterConfig.RuleOfCameraFault) > 0 {
  112. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  113. Node: commonConfig.RosNode,
  114. Topic: topic,
  115. Callback: func(data *pjisuv_msgs.FaultVec) {
  116. subscribersTimeMutexes[i].Lock()
  117. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  118. subscribersMutexes[i].Lock()
  119. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  120. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  121. var faultLabel string
  122. for _, f := range masterConfig.RuleOfCameraFault {
  123. faultLabel = f(data)
  124. if faultLabel != "" {
  125. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  126. subscribersTimes[i] = time.Now()
  127. break
  128. }
  129. }
  130. subscribersMutexes[i].Unlock()
  131. }
  132. subscribersTimeMutexes[i].Unlock()
  133. },
  134. })
  135. }
  136. // 4
  137. if topic == masterConfig.TopicOfCanData && len(masterConfig.RuleOfCanData) > 0 {
  138. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  139. Node: commonConfig.RosNode,
  140. Topic: topic,
  141. Callback: func(data *pjisuv_msgs.Frame) {
  142. subscribersTimeMutexes[i].Lock()
  143. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  144. subscribersMutexes[i].Lock()
  145. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  146. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  147. var faultLabel string
  148. for _, f := range masterConfig.RuleOfCanData {
  149. faultLabel = f(data)
  150. if faultLabel != "" {
  151. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  152. subscribersTimes[i] = time.Now()
  153. break
  154. }
  155. }
  156. subscribersMutexes[i].Unlock()
  157. }
  158. subscribersTimeMutexes[i].Unlock()
  159. },
  160. })
  161. }
  162. // 5
  163. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud && len(masterConfig.RuleOfCh128x1LslidarPointCloud) > 0 {
  164. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  165. Node: commonConfig.RosNode,
  166. Topic: topic,
  167. Callback: func(data *sensor_msgs.PointCloud2) {
  168. subscribersTimeMutexes[i].Lock()
  169. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  170. subscribersMutexes[i].Lock()
  171. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  172. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  173. var faultLabel string
  174. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud {
  175. faultLabel = f(data)
  176. if faultLabel != "" {
  177. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  178. subscribersTimes[i] = time.Now()
  179. break
  180. }
  181. }
  182. subscribersMutexes[i].Unlock()
  183. }
  184. subscribersTimeMutexes[i].Unlock()
  185. },
  186. })
  187. }
  188. // 6
  189. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud && len(masterConfig.RuleOfCh64wLLslidarPointCloud) > 0 {
  190. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  191. Node: commonConfig.RosNode,
  192. Topic: topic,
  193. Callback: func(data *sensor_msgs.PointCloud2) {
  194. subscribersTimeMutexes[i].Lock()
  195. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  196. subscribersMutexes[i].Lock()
  197. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  198. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  199. var faultLabel string
  200. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud {
  201. faultLabel = f(data)
  202. if faultLabel != "" {
  203. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  204. subscribersTimes[i] = time.Now()
  205. break
  206. }
  207. }
  208. subscribersMutexes[i].Unlock()
  209. }
  210. subscribersTimeMutexes[i].Unlock()
  211. },
  212. })
  213. }
  214. // 7
  215. if topic == masterConfig.TopicOfCh64wLScan && len(masterConfig.RuleOfCh64wLScan) > 0 {
  216. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  217. Node: commonConfig.RosNode,
  218. Topic: topic,
  219. Callback: func(data *sensor_msgs.LaserScan) {
  220. subscribersTimeMutexes[i].Lock()
  221. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  222. subscribersMutexes[i].Lock()
  223. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  224. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  225. var faultLabel string
  226. for _, f := range masterConfig.RuleOfCh64wLScan {
  227. faultLabel = f(data)
  228. if faultLabel != "" {
  229. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  230. subscribersTimes[i] = time.Now()
  231. break
  232. }
  233. }
  234. subscribersMutexes[i].Unlock()
  235. }
  236. subscribersTimeMutexes[i].Unlock()
  237. },
  238. })
  239. }
  240. // 8
  241. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud && len(masterConfig.RuleOfCh64wRLslidarPointCloud) > 0 {
  242. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  243. Node: commonConfig.RosNode,
  244. Topic: topic,
  245. Callback: func(data *sensor_msgs.PointCloud2) {
  246. subscribersTimeMutexes[i].Lock()
  247. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  248. subscribersMutexes[i].Lock()
  249. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  250. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  251. var faultLabel string
  252. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud {
  253. faultLabel = f(data)
  254. if faultLabel != "" {
  255. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  256. subscribersTimes[i] = time.Now()
  257. break
  258. }
  259. }
  260. subscribersMutexes[i].Unlock()
  261. }
  262. subscribersTimeMutexes[i].Unlock()
  263. },
  264. })
  265. }
  266. // 9
  267. if topic == masterConfig.TopicOfCh64wRScan && len(masterConfig.RuleOfCh64wRScan) > 0 {
  268. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  269. Node: commonConfig.RosNode,
  270. Topic: topic,
  271. Callback: func(data *sensor_msgs.LaserScan) {
  272. subscribersTimeMutexes[i].Lock()
  273. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  274. subscribersMutexes[i].Lock()
  275. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  276. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  277. var faultLabel string
  278. for _, f := range masterConfig.RuleOfCh64wRScan {
  279. faultLabel = f(data)
  280. if faultLabel != "" {
  281. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  282. subscribersTimes[i] = time.Now()
  283. break
  284. }
  285. }
  286. subscribersMutexes[i].Unlock()
  287. }
  288. subscribersTimeMutexes[i].Unlock()
  289. },
  290. })
  291. }
  292. // 10
  293. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects && len(masterConfig.RuleOfCicvLidarclusterMovingObjects) > 0 {
  294. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  295. Node: commonConfig.RosNode,
  296. Topic: topic,
  297. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  298. subscribersTimeMutexes[i].Lock()
  299. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  300. subscribersMutexes[i].Lock()
  301. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  302. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  303. var faultLabel string
  304. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects {
  305. faultLabel = f(data)
  306. if faultLabel != "" {
  307. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  308. subscribersTimes[i] = time.Now()
  309. break
  310. }
  311. }
  312. subscribersMutexes[i].Unlock()
  313. }
  314. subscribersTimeMutexes[i].Unlock()
  315. },
  316. })
  317. }
  318. // 11
  319. if topic == masterConfig.TopicOfCicvAmrTrajectory && len(masterConfig.RuleOfCicvAmrTrajectory) > 0 {
  320. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  321. Node: commonConfig.RosNode,
  322. Topic: topic,
  323. Callback: func(data *pjisuv_msgs.Trajectory) {
  324. subscribersTimeMutexes[i].Lock()
  325. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  326. subscribersMutexes[i].Lock()
  327. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  328. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  329. var faultLabel string
  330. for _, f := range masterConfig.RuleOfCicvAmrTrajectory {
  331. faultLabel = f(data)
  332. if faultLabel != "" {
  333. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  334. subscribersTimes[i] = time.Now()
  335. break
  336. }
  337. }
  338. subscribersMutexes[i].Unlock()
  339. }
  340. subscribersTimeMutexes[i].Unlock()
  341. },
  342. })
  343. }
  344. // 12
  345. if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
  346. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  347. Node: commonConfig.RosNode,
  348. Topic: topic,
  349. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  350. // 更新共享变量
  351. mutexOfCicvLocation.RLock()
  352. {
  353. extendParam.VelocityYOfCicvLocation = data.VelocityX
  354. extendParam.VelocityYOfCicvLocation = data.VelocityY
  355. extendParam.YawOfCicvLocation = data.Yaw
  356. extendParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
  357. }
  358. mutexOfCicvLocation.RUnlock()
  359. subscribersTimeMutexes[i].Lock()
  360. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  361. subscribersMutexes[i].Lock()
  362. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  363. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  364. var faultLabel string
  365. for _, f := range masterConfig.RuleOfCicvLocation {
  366. faultLabel = f(data)
  367. if faultLabel != "" {
  368. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  369. break
  370. }
  371. }
  372. subscribersMutexes[i].Unlock()
  373. }
  374. subscribersTimeMutexes[i].Unlock()
  375. },
  376. })
  377. }
  378. // 13
  379. if topic == masterConfig.TopicOfCloudClusters && len(masterConfig.RuleOfCloudClusters) > 0 {
  380. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  381. Node: commonConfig.RosNode,
  382. Topic: topic,
  383. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  384. subscribersTimeMutexes[i].Lock()
  385. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  386. subscribersMutexes[i].Lock()
  387. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  388. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  389. var faultLabel string
  390. for _, f := range masterConfig.RuleOfCloudClusters {
  391. faultLabel = f(data)
  392. if faultLabel != "" {
  393. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  394. subscribersTimes[i] = time.Now()
  395. break
  396. }
  397. }
  398. subscribersMutexes[i].Unlock()
  399. }
  400. subscribersTimeMutexes[i].Unlock()
  401. },
  402. })
  403. }
  404. // 14
  405. if topic == masterConfig.TopicOfHeartbeatInfo && len(masterConfig.RuleOfHeartbeatInfo) > 0 {
  406. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  407. Node: commonConfig.RosNode,
  408. Topic: topic,
  409. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  410. subscribersTimeMutexes[i].Lock()
  411. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  412. subscribersMutexes[i].Lock()
  413. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  414. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  415. var faultLabel string
  416. for _, f := range masterConfig.RuleOfHeartbeatInfo {
  417. faultLabel = f(data)
  418. if faultLabel != "" {
  419. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  420. subscribersTimes[i] = time.Now()
  421. break
  422. }
  423. }
  424. subscribersMutexes[i].Unlock()
  425. }
  426. subscribersTimeMutexes[i].Unlock()
  427. },
  428. })
  429. }
  430. // 15
  431. if topic == masterConfig.TopicOfLidarPretreatmentCost && len(masterConfig.RuleOfLidarPretreatmentCost) > 0 {
  432. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  433. Node: commonConfig.RosNode,
  434. Topic: topic,
  435. Callback: func(data *geometry_msgs.Vector3Stamped) {
  436. subscribersTimeMutexes[i].Lock()
  437. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  438. subscribersMutexes[i].Lock()
  439. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  440. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  441. var faultLabel string
  442. for _, f := range masterConfig.RuleOfLidarPretreatmentCost {
  443. faultLabel = f(data)
  444. if faultLabel != "" {
  445. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  446. subscribersTimes[i] = time.Now()
  447. break
  448. }
  449. }
  450. subscribersMutexes[i].Unlock()
  451. }
  452. subscribersTimeMutexes[i].Unlock()
  453. },
  454. })
  455. }
  456. // 16
  457. if topic == masterConfig.TopicOfLidarPretreatmentOdometry && len(masterConfig.RuleOfLidarPretreatmentOdometry) > 0 {
  458. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  459. Node: commonConfig.RosNode,
  460. Topic: topic,
  461. Callback: func(data *nav_msgs.Odometry) {
  462. subscribersTimeMutexes[i].Lock()
  463. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  464. subscribersMutexes[i].Lock()
  465. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  466. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  467. var faultLabel string
  468. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry {
  469. faultLabel = f(data)
  470. if faultLabel != "" {
  471. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  472. subscribersTimes[i] = time.Now()
  473. break
  474. }
  475. }
  476. subscribersMutexes[i].Unlock()
  477. }
  478. subscribersTimeMutexes[i].Unlock()
  479. },
  480. })
  481. }
  482. // 17
  483. if topic == masterConfig.TopicOfLidarRoi && len(masterConfig.RuleOfLidarRoi) > 0 {
  484. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  485. Node: commonConfig.RosNode,
  486. Topic: topic,
  487. Callback: func(data *geometry_msgs.PolygonStamped) {
  488. subscribersTimeMutexes[i].Lock()
  489. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  490. subscribersMutexes[i].Lock()
  491. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  492. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  493. var faultLabel string
  494. for _, f := range masterConfig.RuleOfLidarRoi {
  495. faultLabel = f(data)
  496. if faultLabel != "" {
  497. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  498. subscribersTimes[i] = time.Now()
  499. break
  500. }
  501. }
  502. subscribersMutexes[i].Unlock()
  503. }
  504. subscribersTimeMutexes[i].Unlock()
  505. },
  506. })
  507. }
  508. // 18
  509. if topic == masterConfig.TopicOfLine1 && len(masterConfig.RuleOfLine1) > 0 {
  510. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  511. Node: commonConfig.RosNode,
  512. Topic: topic,
  513. Callback: func(data *nav_msgs.Path) {
  514. subscribersTimeMutexes[i].Lock()
  515. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  516. subscribersMutexes[i].Lock()
  517. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  518. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  519. var faultLabel string
  520. for _, f := range masterConfig.RuleOfLine1 {
  521. faultLabel = f(data)
  522. if faultLabel != "" {
  523. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  524. subscribersTimes[i] = time.Now()
  525. break
  526. }
  527. }
  528. subscribersMutexes[i].Unlock()
  529. }
  530. subscribersTimeMutexes[i].Unlock()
  531. },
  532. })
  533. }
  534. // 19
  535. if topic == masterConfig.TopicOfLine2 && len(masterConfig.RuleOfLine2) > 0 {
  536. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  537. Node: commonConfig.RosNode,
  538. Topic: topic,
  539. Callback: func(data *nav_msgs.Path) {
  540. subscribersTimeMutexes[i].Lock()
  541. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  542. subscribersMutexes[i].Lock()
  543. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  544. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  545. var faultLabel string
  546. for _, f := range masterConfig.RuleOfLine2 {
  547. faultLabel = f(data)
  548. if faultLabel != "" {
  549. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  550. subscribersTimes[i] = time.Now()
  551. break
  552. }
  553. }
  554. subscribersMutexes[i].Unlock()
  555. }
  556. subscribersTimeMutexes[i].Unlock()
  557. },
  558. })
  559. }
  560. // 20
  561. if topic == masterConfig.TopicOfMapPolygon && len(masterConfig.RuleOfMapPolygon) > 0 {
  562. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  563. Node: commonConfig.RosNode,
  564. Topic: topic,
  565. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  566. subscribersTimeMutexes[i].Lock()
  567. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  568. subscribersMutexes[i].Lock()
  569. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  570. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  571. var faultLabel string
  572. for _, f := range masterConfig.RuleOfMapPolygon {
  573. faultLabel = f(data)
  574. if faultLabel != "" {
  575. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  576. subscribersTimes[i] = time.Now()
  577. break
  578. }
  579. }
  580. subscribersMutexes[i].Unlock()
  581. }
  582. subscribersTimeMutexes[i].Unlock()
  583. },
  584. })
  585. }
  586. // 21
  587. if topic == masterConfig.TopicOfObstacleDisplay && len(masterConfig.RuleOfObstacleDisplay) > 0 {
  588. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  589. Node: commonConfig.RosNode,
  590. Topic: topic,
  591. Callback: func(data *visualization_msgs.MarkerArray) {
  592. subscribersTimeMutexes[i].Lock()
  593. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  594. subscribersMutexes[i].Lock()
  595. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  596. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  597. var faultLabel string
  598. for _, f := range masterConfig.RuleOfObstacleDisplay {
  599. faultLabel = f(data)
  600. if faultLabel != "" {
  601. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  602. subscribersTimes[i] = time.Now()
  603. break
  604. }
  605. }
  606. subscribersMutexes[i].Unlock()
  607. }
  608. subscribersTimeMutexes[i].Unlock()
  609. },
  610. })
  611. }
  612. // 22
  613. if topic == masterConfig.TopicOfPjControlPub && len(masterConfig.RuleOfPjControlPub) > 0 {
  614. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  615. Node: commonConfig.RosNode,
  616. Topic: topic,
  617. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  618. // 更新共享变量
  619. mutexOfPjControlPub.RLock()
  620. {
  621. extendParam.NumCountPjiControlCommandOfPjControlPub++
  622. if extendParam.NumCountPjiControlCommandOfPjControlPub == 10 {
  623. extendParam.EgoSteeringCmdOfPjControlPub = append(extendParam.EgoSteeringCmdOfPjControlPub, data.ICPVCmdStrAngle)
  624. extendParam.EgoThrottleCmdOfPjControlPub = append(extendParam.EgoThrottleCmdOfPjControlPub, data.ICPVCmdAccPelPosAct)
  625. extendParam.NumCountPjiControlCommandOfPjControlPub = 0
  626. }
  627. }
  628. mutexOfPjControlPub.RUnlock()
  629. subscribersTimeMutexes[i].Lock()
  630. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  631. subscribersMutexes[i].Lock()
  632. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  633. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  634. var faultLabel string
  635. for _, f := range masterConfig.RuleOfPjControlPub {
  636. faultLabel = f(data)
  637. if faultLabel != "" {
  638. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  639. subscribersTimes[i] = time.Now()
  640. break
  641. }
  642. }
  643. subscribersMutexes[i].Unlock()
  644. }
  645. subscribersTimeMutexes[i].Unlock()
  646. },
  647. })
  648. }
  649. // 23
  650. if topic == masterConfig.TopicOfPointsCluster && len(masterConfig.RuleOfPointsCluster) > 0 {
  651. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  652. Node: commonConfig.RosNode,
  653. Topic: topic,
  654. Callback: func(data *sensor_msgs.PointCloud2) {
  655. subscribersTimeMutexes[i].Lock()
  656. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  657. subscribersMutexes[i].Lock()
  658. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  659. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  660. var faultLabel string
  661. for _, f := range masterConfig.RuleOfPointsCluster {
  662. faultLabel = f(data)
  663. if faultLabel != "" {
  664. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  665. subscribersTimes[i] = time.Now()
  666. break
  667. }
  668. }
  669. subscribersMutexes[i].Unlock()
  670. }
  671. subscribersTimeMutexes[i].Unlock()
  672. },
  673. })
  674. }
  675. // 24
  676. if topic == masterConfig.TopicOfPointsConcat && len(masterConfig.RuleOfPointsConcat) > 0 {
  677. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  678. Node: commonConfig.RosNode,
  679. Topic: topic,
  680. Callback: func(data *sensor_msgs.PointCloud2) {
  681. subscribersTimeMutexes[i].Lock()
  682. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  683. subscribersMutexes[i].Lock()
  684. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  685. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  686. var faultLabel string
  687. for _, f := range masterConfig.RuleOfPointsConcat {
  688. faultLabel = f(data)
  689. if faultLabel != "" {
  690. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  691. subscribersTimes[i] = time.Now()
  692. break
  693. }
  694. }
  695. subscribersMutexes[i].Unlock()
  696. }
  697. subscribersTimeMutexes[i].Unlock()
  698. },
  699. })
  700. }
  701. // 25
  702. if topic == masterConfig.TopicOfReferenceDisplay && len(masterConfig.RuleOfReferenceDisplay) > 0 {
  703. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  704. Node: commonConfig.RosNode,
  705. Topic: topic,
  706. Callback: func(data *nav_msgs.Path) {
  707. subscribersTimeMutexes[i].Lock()
  708. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  709. subscribersMutexes[i].Lock()
  710. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  711. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  712. var faultLabel string
  713. for _, f := range masterConfig.RuleOfReferenceDisplay {
  714. faultLabel = f(data)
  715. if faultLabel != "" {
  716. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  717. subscribersTimes[i] = time.Now()
  718. break
  719. }
  720. }
  721. subscribersMutexes[i].Unlock()
  722. }
  723. subscribersTimeMutexes[i].Unlock()
  724. },
  725. })
  726. }
  727. // 26
  728. if topic == masterConfig.TopicOfReferenceTrajectory && len(masterConfig.RuleOfReferenceTrajectory) > 0 {
  729. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  730. Node: commonConfig.RosNode,
  731. Topic: topic,
  732. Callback: func(data *pjisuv_msgs.Trajectory) {
  733. subscribersTimeMutexes[i].Lock()
  734. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  735. subscribersMutexes[i].Lock()
  736. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  737. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  738. var faultLabel string
  739. for _, f := range masterConfig.RuleOfReferenceTrajectory {
  740. faultLabel = f(data)
  741. if faultLabel != "" {
  742. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  743. subscribersTimes[i] = time.Now()
  744. break
  745. }
  746. }
  747. subscribersMutexes[i].Unlock()
  748. }
  749. subscribersTimeMutexes[i].Unlock()
  750. },
  751. })
  752. }
  753. // 27
  754. if topic == masterConfig.TopicOfRoiPoints && len(masterConfig.RuleOfRoiPoints) > 0 {
  755. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  756. Node: commonConfig.RosNode,
  757. Topic: topic,
  758. Callback: func(data *sensor_msgs.PointCloud2) {
  759. subscribersTimeMutexes[i].Lock()
  760. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  761. subscribersMutexes[i].Lock()
  762. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  763. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  764. var faultLabel string
  765. for _, f := range masterConfig.RuleOfRoiPoints {
  766. faultLabel = f(data)
  767. if faultLabel != "" {
  768. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  769. subscribersTimes[i] = time.Now()
  770. break
  771. }
  772. }
  773. subscribersMutexes[i].Unlock()
  774. }
  775. subscribersTimeMutexes[i].Unlock()
  776. },
  777. })
  778. }
  779. // 28
  780. if topic == masterConfig.TopicOfRoiPolygon && len(masterConfig.RuleOfRoiPolygon) > 0 {
  781. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  782. Node: commonConfig.RosNode,
  783. Topic: topic,
  784. Callback: func(data *nav_msgs.Path) {
  785. subscribersTimeMutexes[i].Lock()
  786. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  787. subscribersMutexes[i].Lock()
  788. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  789. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  790. var faultLabel string
  791. for _, f := range masterConfig.RuleOfRoiPolygon {
  792. faultLabel = f(data)
  793. if faultLabel != "" {
  794. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  795. subscribersTimes[i] = time.Now()
  796. break
  797. }
  798. }
  799. subscribersMutexes[i].Unlock()
  800. }
  801. subscribersTimeMutexes[i].Unlock()
  802. },
  803. })
  804. }
  805. // 29
  806. if topic == masterConfig.TopicOfTf && len(masterConfig.RuleOfTf) > 0 {
  807. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  808. Node: commonConfig.RosNode,
  809. Topic: topic,
  810. Callback: func(data *tf2_msgs.TFMessage) {
  811. subscribersTimeMutexes[i].Lock()
  812. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  813. subscribersMutexes[i].Lock()
  814. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  815. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  816. var faultLabel string
  817. for _, f := range masterConfig.RuleOfTf {
  818. faultLabel = f(data)
  819. if faultLabel != "" {
  820. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  821. subscribersTimes[i] = time.Now()
  822. break
  823. }
  824. }
  825. subscribersMutexes[i].Unlock()
  826. }
  827. subscribersTimeMutexes[i].Unlock()
  828. },
  829. })
  830. }
  831. // 30
  832. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  833. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  834. Node: commonConfig.RosNode,
  835. Topic: topic,
  836. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  837. // 更新共享变量
  838. mutexOfTpperception.RLock()
  839. {
  840. for _, obj := range data.Objs {
  841. if obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  842. continue
  843. }
  844. if _, ok := extendParam.ObjDicOfTpperception[obj.Id]; !ok {
  845. extendParam.ObjDicOfTpperception[obj.Id] = []float32{}
  846. }
  847. extendParam.ObjDicOfTpperception[obj.Id] = append(extendParam.ObjDicOfTpperception[obj.Id], obj.Y)
  848. }
  849. }
  850. mutexOfTpperception.RUnlock()
  851. subscribersTimeMutexes[i].Lock()
  852. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  853. subscribersMutexes[i].Lock()
  854. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  855. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  856. var faultLabel string
  857. for _, f := range masterConfig.RuleOfTpperception {
  858. faultLabel = f(data, extendParam)
  859. if faultLabel != "" {
  860. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  861. break
  862. }
  863. }
  864. subscribersMutexes[i].Unlock()
  865. }
  866. subscribersTimeMutexes[i].Unlock()
  867. },
  868. })
  869. }
  870. // 31
  871. if topic == masterConfig.TopicOfTpperceptionVis && len(masterConfig.RuleOfTpperceptionVis) > 0 {
  872. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  873. Node: commonConfig.RosNode,
  874. Topic: topic,
  875. Callback: func(data *visualization_msgs.MarkerArray) {
  876. subscribersTimeMutexes[i].Lock()
  877. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  878. subscribersMutexes[i].Lock()
  879. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  880. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  881. var faultLabel string
  882. for _, f := range masterConfig.RuleOfTpperceptionVis {
  883. faultLabel = f(data)
  884. if faultLabel != "" {
  885. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  886. subscribersTimes[i] = time.Now()
  887. break
  888. }
  889. }
  890. subscribersMutexes[i].Unlock()
  891. }
  892. subscribersTimeMutexes[i].Unlock()
  893. },
  894. })
  895. }
  896. // 32
  897. if topic == masterConfig.TopicOfTprouteplan && len(masterConfig.RuleOfTprouteplan) > 0 {
  898. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  899. Node: commonConfig.RosNode,
  900. Topic: topic,
  901. Callback: func(data *pjisuv_msgs.RoutePlan) {
  902. subscribersTimeMutexes[i].Lock()
  903. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  904. subscribersMutexes[i].Lock()
  905. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  906. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  907. var faultLabel string
  908. for _, f := range masterConfig.RuleOfTprouteplan {
  909. faultLabel = f(data)
  910. if faultLabel != "" {
  911. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  912. subscribersTimes[i] = time.Now()
  913. break
  914. }
  915. }
  916. subscribersMutexes[i].Unlock()
  917. }
  918. subscribersTimeMutexes[i].Unlock()
  919. },
  920. })
  921. }
  922. // 33
  923. if topic == masterConfig.TopicOfTrajectoryDisplay && len(masterConfig.RuleOfTrajectoryDisplay) > 0 {
  924. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  925. Node: commonConfig.RosNode,
  926. Topic: topic,
  927. Callback: func(data *nav_msgs.Path) {
  928. subscribersTimeMutexes[i].Lock()
  929. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  930. subscribersMutexes[i].Lock()
  931. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  932. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  933. var faultLabel string
  934. for _, f := range masterConfig.RuleOfTrajectoryDisplay {
  935. faultLabel = f(data)
  936. if faultLabel != "" {
  937. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  938. subscribersTimes[i] = time.Now()
  939. break
  940. }
  941. }
  942. subscribersMutexes[i].Unlock()
  943. }
  944. subscribersTimeMutexes[i].Unlock()
  945. },
  946. })
  947. }
  948. // 34
  949. if topic == masterConfig.TopicOfUngroundCloudpoints && len(masterConfig.RuleOfUngroundCloudpoints) > 0 {
  950. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  951. Node: commonConfig.RosNode,
  952. Topic: topic,
  953. Callback: func(data *sensor_msgs.PointCloud2) {
  954. subscribersTimeMutexes[i].Lock()
  955. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  956. subscribersMutexes[i].Lock()
  957. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  958. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  959. var faultLabel string
  960. for _, f := range masterConfig.RuleOfUngroundCloudpoints {
  961. faultLabel = f(data)
  962. if faultLabel != "" {
  963. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  964. subscribersTimes[i] = time.Now()
  965. break
  966. }
  967. }
  968. subscribersMutexes[i].Unlock()
  969. }
  970. subscribersTimeMutexes[i].Unlock()
  971. },
  972. })
  973. }
  974. // 35
  975. if topic == masterConfig.TopicOfCameraImage && len(masterConfig.RuleOfCameraImage) > 0 {
  976. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  977. Node: commonConfig.RosNode,
  978. Topic: topic,
  979. Callback: func(data *sensor_msgs.Image) {
  980. subscribersTimeMutexes[i].Lock()
  981. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  982. subscribersMutexes[i].Lock()
  983. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  984. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  985. var faultLabel string
  986. for _, f := range masterConfig.RuleOfCameraImage {
  987. faultLabel = f(data)
  988. if faultLabel != "" {
  989. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  990. subscribersTimes[i] = time.Now()
  991. break
  992. }
  993. }
  994. subscribersMutexes[i].Unlock()
  995. }
  996. subscribersTimeMutexes[i].Unlock()
  997. },
  998. })
  999. }
  1000. // 36
  1001. if topic == masterConfig.TopicOfDataRead && len(masterConfig.RuleOfDataRead) > 0 {
  1002. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1003. Node: commonConfig.RosNode,
  1004. Topic: topic,
  1005. Callback: func(data *pjisuv_msgs.Retrieval) {
  1006. // 更新共享变量
  1007. mutexOfDataRead.RLock()
  1008. {
  1009. extendParam.NumCountDataReadOfDataRead++
  1010. if extendParam.NumCountDataReadOfDataRead == 10 {
  1011. extendParam.EgoSteeringRealOfDataRead = append(extendParam.EgoSteeringRealOfDataRead, data.ActStrWhAng)
  1012. extendParam.EgoThrottleRealOfDataRead = append(extendParam.EgoThrottleRealOfDataRead, data.AccPed2)
  1013. extendParam.NumCountDataReadOfDataRead = 0
  1014. }
  1015. extendParam.StrgAngleRealValueOfDataRead = data.ActStrWhAng
  1016. }
  1017. mutexOfDataRead.RUnlock()
  1018. subscribersTimeMutexes[i].Lock()
  1019. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1020. subscribersMutexes[i].Lock()
  1021. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1022. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1023. var faultLabel string
  1024. for _, f := range masterConfig.RuleOfDataRead {
  1025. faultLabel = f(data)
  1026. if faultLabel != "" {
  1027. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1028. subscribersTimes[i] = time.Now()
  1029. break
  1030. }
  1031. }
  1032. subscribersMutexes[i].Unlock()
  1033. }
  1034. subscribersTimeMutexes[i].Unlock()
  1035. },
  1036. })
  1037. }
  1038. // 37
  1039. if topic == masterConfig.TopicOfPjiGps && len(masterConfig.RuleOfPjiGps) > 0 {
  1040. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1041. Node: commonConfig.RosNode,
  1042. Topic: topic,
  1043. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  1044. subscribersTimeMutexes[i].Lock()
  1045. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1046. subscribersMutexes[i].Lock()
  1047. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1048. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1049. var faultLabel string
  1050. for _, f := range masterConfig.RuleOfPjiGps {
  1051. faultLabel = f(data)
  1052. if faultLabel != "" {
  1053. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1054. subscribersTimes[i] = time.Now()
  1055. break
  1056. }
  1057. }
  1058. subscribersMutexes[i].Unlock()
  1059. }
  1060. subscribersTimeMutexes[i].Unlock()
  1061. },
  1062. })
  1063. }
  1064. if err != nil {
  1065. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  1066. //TODO 如何回传日志
  1067. continue
  1068. }
  1069. }
  1070. select {
  1071. case signal := <-service.ChannelKillWindowProducer:
  1072. if signal == 1 {
  1073. commonConfig.RosNode.Close()
  1074. service.AddKillTimes("3")
  1075. return
  1076. }
  1077. }
  1078. }
  1079. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  1080. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1081. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  1082. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  1083. newTimeWindow := commonEntity.TimeWindow{
  1084. FaultTime: faultHappenTime,
  1085. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  1086. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  1087. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  1088. Labels: []string{faultLabel},
  1089. MasterTopics: masterTopics,
  1090. SlaveTopics: slaveTopics,
  1091. }
  1092. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1093. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1094. } else {
  1095. // 2-2 如果在旧故障窗口内
  1096. commonEntity.TimeWindowProducerQueueMutex.RLock()
  1097. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  1098. // 2-2-1 更新故障窗口end时间
  1099. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  1100. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1101. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  1102. lastTimeWindow.TimeWindowEnd = maxEnd
  1103. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  1104. } else {
  1105. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  1106. lastTimeWindow.TimeWindowEnd = expectEnd
  1107. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1108. }
  1109. }
  1110. // 2-2-2 更新label
  1111. labels := lastTimeWindow.Labels
  1112. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1113. // 2-2-3 更新 topic
  1114. sourceMasterTopics := lastTimeWindow.MasterTopics
  1115. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1116. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1117. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1118. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1119. }
  1120. }
  1121. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1122. // 获取所有需要采集的topic
  1123. var faultCodeTopics []string
  1124. for _, code := range commonConfig.CloudConfig.Triggers {
  1125. if code.Label == faultLabel {
  1126. faultCodeTopics = code.Topics
  1127. }
  1128. }
  1129. // 根据不同节点采集的topic进行分配采集
  1130. for _, acceptTopic := range faultCodeTopics {
  1131. for _, host := range commonConfig.CloudConfig.Hosts {
  1132. for _, topic := range host.Topics {
  1133. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1134. masterTopics = append(masterTopics, acceptTopic)
  1135. }
  1136. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1137. slaveTopics = append(slaveTopics, acceptTopic)
  1138. }
  1139. }
  1140. }
  1141. }
  1142. return masterTopics, slaveTopics
  1143. }