produce_window.go 40 KB


  1. package svc
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "github.com/bluenviron/goroslib/v2"
  11. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. m sync.RWMutex
  21. velocityX float64
  22. velocityY float64
  23. yaw float64
  24. )
  25. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  26. func PrepareTimeWindowProducerQueue() {
  27. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  28. var err error
  29. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  30. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  31. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  32. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  33. for i, topic := range commonConfig.SubscribeTopics {
  34. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  35. // 1
  36. if topic == masterConfig.TopicOfAmrPose && len(masterConfig.RuleOfAmrPose) > 0 {
  37. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  38. Node: commonConfig.RosNode,
  39. Topic: topic,
  40. Callback: func(data *visualization_msgs.MarkerArray) {
  41. subscribersTimeMutexes[i].Lock()
  42. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  43. subscribersMutexes[i].Lock()
  44. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  45. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  46. var faultLabel string
  47. for _, f := range masterConfig.RuleOfAmrPose {
  48. faultLabel = f(data)
  49. if faultLabel != "" {
  50. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  51. subscribersTimes[i] = time.Now()
  52. break
  53. }
  54. }
  55. subscribersMutexes[i].Unlock()
  56. }
  57. subscribersTimeMutexes[i].Unlock()
  58. },
  59. })
  60. }
  61. // 2
  62. if topic == masterConfig.TopicOfBoundingBoxesFast && len(masterConfig.RuleOfBoundingBoxesFast) > 0 {
  63. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  64. Node: commonConfig.RosNode,
  65. Topic: topic,
  66. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  67. subscribersTimeMutexes[i].Lock()
  68. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  69. subscribersMutexes[i].Lock()
  70. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  71. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  72. var faultLabel string
  73. for _, f := range masterConfig.RuleOfBoundingBoxesFast {
  74. faultLabel = f(data)
  75. if faultLabel != "" {
  76. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  77. subscribersTimes[i] = time.Now()
  78. break
  79. }
  80. }
  81. subscribersMutexes[i].Unlock()
  82. }
  83. subscribersTimeMutexes[i].Unlock()
  84. },
  85. })
  86. }
  87. // 3
  88. if topic == masterConfig.TopicOfCameraFault && len(masterConfig.RuleOfCameraFault) > 0 {
  89. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  90. Node: commonConfig.RosNode,
  91. Topic: topic,
  92. Callback: func(data *pjisuv_msgs.FaultVec) {
  93. subscribersTimeMutexes[i].Lock()
  94. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  95. subscribersMutexes[i].Lock()
  96. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  97. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  98. var faultLabel string
  99. for _, f := range masterConfig.RuleOfCameraFault {
  100. faultLabel = f(data)
  101. if faultLabel != "" {
  102. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  103. subscribersTimes[i] = time.Now()
  104. break
  105. }
  106. }
  107. subscribersMutexes[i].Unlock()
  108. }
  109. subscribersTimeMutexes[i].Unlock()
  110. },
  111. })
  112. }
  113. // 4
  114. if topic == masterConfig.TopicOfCanData && len(masterConfig.RuleOfCanData) > 0 {
  115. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  116. Node: commonConfig.RosNode,
  117. Topic: topic,
  118. Callback: func(data *pjisuv_msgs.Frame) {
  119. subscribersTimeMutexes[i].Lock()
  120. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  121. subscribersMutexes[i].Lock()
  122. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  123. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  124. var faultLabel string
  125. for _, f := range masterConfig.RuleOfCanData {
  126. faultLabel = f(data)
  127. if faultLabel != "" {
  128. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  129. subscribersTimes[i] = time.Now()
  130. break
  131. }
  132. }
  133. subscribersMutexes[i].Unlock()
  134. }
  135. subscribersTimeMutexes[i].Unlock()
  136. },
  137. })
  138. }
  139. // 5
  140. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud && len(masterConfig.RuleOfCh128x1LslidarPointCloud) > 0 {
  141. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  142. Node: commonConfig.RosNode,
  143. Topic: topic,
  144. Callback: func(data *sensor_msgs.PointCloud2) {
  145. subscribersTimeMutexes[i].Lock()
  146. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  147. subscribersMutexes[i].Lock()
  148. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  149. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  150. var faultLabel string
  151. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud {
  152. faultLabel = f(data)
  153. if faultLabel != "" {
  154. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  155. subscribersTimes[i] = time.Now()
  156. break
  157. }
  158. }
  159. subscribersMutexes[i].Unlock()
  160. }
  161. subscribersTimeMutexes[i].Unlock()
  162. },
  163. })
  164. }
  165. // 6
  166. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud && len(masterConfig.RuleOfCh64wLLslidarPointCloud) > 0 {
  167. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  168. Node: commonConfig.RosNode,
  169. Topic: topic,
  170. Callback: func(data *sensor_msgs.PointCloud2) {
  171. subscribersTimeMutexes[i].Lock()
  172. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  173. subscribersMutexes[i].Lock()
  174. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  175. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  176. var faultLabel string
  177. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud {
  178. faultLabel = f(data)
  179. if faultLabel != "" {
  180. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  181. subscribersTimes[i] = time.Now()
  182. break
  183. }
  184. }
  185. subscribersMutexes[i].Unlock()
  186. }
  187. subscribersTimeMutexes[i].Unlock()
  188. },
  189. })
  190. }
  191. // 7
  192. if topic == masterConfig.TopicOfCh64wLScan && len(masterConfig.RuleOfCh64wLScan) > 0 {
  193. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  194. Node: commonConfig.RosNode,
  195. Topic: topic,
  196. Callback: func(data *sensor_msgs.LaserScan) {
  197. subscribersTimeMutexes[i].Lock()
  198. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  199. subscribersMutexes[i].Lock()
  200. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  201. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  202. var faultLabel string
  203. for _, f := range masterConfig.RuleOfCh64wLScan {
  204. faultLabel = f(data)
  205. if faultLabel != "" {
  206. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  207. subscribersTimes[i] = time.Now()
  208. break
  209. }
  210. }
  211. subscribersMutexes[i].Unlock()
  212. }
  213. subscribersTimeMutexes[i].Unlock()
  214. },
  215. })
  216. }
  217. // 8
  218. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud && len(masterConfig.RuleOfCh64wRLslidarPointCloud) > 0 {
  219. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  220. Node: commonConfig.RosNode,
  221. Topic: topic,
  222. Callback: func(data *sensor_msgs.PointCloud2) {
  223. subscribersTimeMutexes[i].Lock()
  224. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  225. subscribersMutexes[i].Lock()
  226. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  227. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  228. var faultLabel string
  229. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud {
  230. faultLabel = f(data)
  231. if faultLabel != "" {
  232. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  233. subscribersTimes[i] = time.Now()
  234. break
  235. }
  236. }
  237. subscribersMutexes[i].Unlock()
  238. }
  239. subscribersTimeMutexes[i].Unlock()
  240. },
  241. })
  242. }
  243. // 9
  244. if topic == masterConfig.TopicOfCh64wRScan && len(masterConfig.RuleOfCh64wRScan) > 0 {
  245. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  246. Node: commonConfig.RosNode,
  247. Topic: topic,
  248. Callback: func(data *sensor_msgs.LaserScan) {
  249. subscribersTimeMutexes[i].Lock()
  250. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  251. subscribersMutexes[i].Lock()
  252. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  253. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  254. var faultLabel string
  255. for _, f := range masterConfig.RuleOfCh64wRScan {
  256. faultLabel = f(data)
  257. if faultLabel != "" {
  258. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  259. subscribersTimes[i] = time.Now()
  260. break
  261. }
  262. }
  263. subscribersMutexes[i].Unlock()
  264. }
  265. subscribersTimeMutexes[i].Unlock()
  266. },
  267. })
  268. }
  269. // 10
  270. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects && len(masterConfig.RuleOfCicvLidarclusterMovingObjects) > 0 {
  271. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  272. Node: commonConfig.RosNode,
  273. Topic: topic,
  274. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  275. subscribersTimeMutexes[i].Lock()
  276. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  277. subscribersMutexes[i].Lock()
  278. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  279. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  280. var faultLabel string
  281. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects {
  282. faultLabel = f(data)
  283. if faultLabel != "" {
  284. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  285. subscribersTimes[i] = time.Now()
  286. break
  287. }
  288. }
  289. subscribersMutexes[i].Unlock()
  290. }
  291. subscribersTimeMutexes[i].Unlock()
  292. },
  293. })
  294. }
  295. // 11
  296. if topic == masterConfig.TopicOfCicvAmrTrajectory && len(masterConfig.RuleOfCicvAmrTrajectory) > 0 {
  297. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  298. Node: commonConfig.RosNode,
  299. Topic: topic,
  300. Callback: func(data *pjisuv_msgs.Trajectory) {
  301. subscribersTimeMutexes[i].Lock()
  302. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  303. subscribersMutexes[i].Lock()
  304. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  305. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  306. var faultLabel string
  307. for _, f := range masterConfig.RuleOfCicvAmrTrajectory {
  308. faultLabel = f(data)
  309. if faultLabel != "" {
  310. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  311. subscribersTimes[i] = time.Now()
  312. break
  313. }
  314. }
  315. subscribersMutexes[i].Unlock()
  316. }
  317. subscribersTimeMutexes[i].Unlock()
  318. },
  319. })
  320. }
  321. // 12
  322. if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
  323. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  324. Node: commonConfig.RosNode,
  325. Topic: topic,
  326. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  327. // 更新共享变量
  328. m.RLock()
  329. velocityX = data.VelocityX
  330. velocityY = data.VelocityY
  331. yaw = data.Yaw
  332. m.RUnlock()
  333. subscribersTimeMutexes[i].Lock()
  334. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  335. subscribersMutexes[i].Lock()
  336. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  337. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  338. var faultLabel string
  339. for _, f := range masterConfig.RuleOfCicvLocation {
  340. faultLabel = f(data)
  341. if faultLabel != "" {
  342. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  343. break
  344. }
  345. }
  346. subscribersMutexes[i].Unlock()
  347. }
  348. subscribersTimeMutexes[i].Unlock()
  349. },
  350. })
  351. }
  352. // 13
  353. if topic == masterConfig.TopicOfCloudClusters && len(masterConfig.RuleOfCloudClusters) > 0 {
  354. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  355. Node: commonConfig.RosNode,
  356. Topic: topic,
  357. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  358. subscribersTimeMutexes[i].Lock()
  359. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  360. subscribersMutexes[i].Lock()
  361. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  362. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  363. var faultLabel string
  364. for _, f := range masterConfig.RuleOfCloudClusters {
  365. faultLabel = f(data)
  366. if faultLabel != "" {
  367. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  368. subscribersTimes[i] = time.Now()
  369. break
  370. }
  371. }
  372. subscribersMutexes[i].Unlock()
  373. }
  374. subscribersTimeMutexes[i].Unlock()
  375. },
  376. })
  377. }
  378. // 14
  379. if topic == masterConfig.TopicOfHeartbeatInfo && len(masterConfig.RuleOfHeartbeatInfo) > 0 {
  380. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  381. Node: commonConfig.RosNode,
  382. Topic: topic,
  383. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  384. subscribersTimeMutexes[i].Lock()
  385. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  386. subscribersMutexes[i].Lock()
  387. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  388. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  389. var faultLabel string
  390. for _, f := range masterConfig.RuleOfHeartbeatInfo {
  391. faultLabel = f(data)
  392. if faultLabel != "" {
  393. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  394. subscribersTimes[i] = time.Now()
  395. break
  396. }
  397. }
  398. subscribersMutexes[i].Unlock()
  399. }
  400. subscribersTimeMutexes[i].Unlock()
  401. },
  402. })
  403. }
  404. // 15
  405. if topic == masterConfig.TopicOfLidarPretreatmentCost && len(masterConfig.RuleOfLidarPretreatmentCost) > 0 {
  406. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  407. Node: commonConfig.RosNode,
  408. Topic: topic,
  409. Callback: func(data *geometry_msgs.Vector3Stamped) {
  410. subscribersTimeMutexes[i].Lock()
  411. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  412. subscribersMutexes[i].Lock()
  413. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  414. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  415. var faultLabel string
  416. for _, f := range masterConfig.RuleOfLidarPretreatmentCost {
  417. faultLabel = f(data)
  418. if faultLabel != "" {
  419. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  420. subscribersTimes[i] = time.Now()
  421. break
  422. }
  423. }
  424. subscribersMutexes[i].Unlock()
  425. }
  426. subscribersTimeMutexes[i].Unlock()
  427. },
  428. })
  429. }
  430. // 16
  431. if topic == masterConfig.TopicOfLidarPretreatmentOdometry && len(masterConfig.RuleOfLidarPretreatmentOdometry) > 0 {
  432. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  433. Node: commonConfig.RosNode,
  434. Topic: topic,
  435. Callback: func(data *nav_msgs.Odometry) {
  436. subscribersTimeMutexes[i].Lock()
  437. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  438. subscribersMutexes[i].Lock()
  439. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  440. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  441. var faultLabel string
  442. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry {
  443. faultLabel = f(data)
  444. if faultLabel != "" {
  445. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  446. subscribersTimes[i] = time.Now()
  447. break
  448. }
  449. }
  450. subscribersMutexes[i].Unlock()
  451. }
  452. subscribersTimeMutexes[i].Unlock()
  453. },
  454. })
  455. }
  456. // 17
  457. if topic == masterConfig.TopicOfLidarRoi && len(masterConfig.RuleOfLidarRoi) > 0 {
  458. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  459. Node: commonConfig.RosNode,
  460. Topic: topic,
  461. Callback: func(data *geometry_msgs.PolygonStamped) {
  462. subscribersTimeMutexes[i].Lock()
  463. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  464. subscribersMutexes[i].Lock()
  465. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  466. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  467. var faultLabel string
  468. for _, f := range masterConfig.RuleOfLidarRoi {
  469. faultLabel = f(data)
  470. if faultLabel != "" {
  471. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  472. subscribersTimes[i] = time.Now()
  473. break
  474. }
  475. }
  476. subscribersMutexes[i].Unlock()
  477. }
  478. subscribersTimeMutexes[i].Unlock()
  479. },
  480. })
  481. }
  482. // 18
  483. if topic == masterConfig.TopicOfLine1 && len(masterConfig.RuleOfLine1) > 0 {
  484. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  485. Node: commonConfig.RosNode,
  486. Topic: topic,
  487. Callback: func(data *nav_msgs.Path) {
  488. subscribersTimeMutexes[i].Lock()
  489. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  490. subscribersMutexes[i].Lock()
  491. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  492. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  493. var faultLabel string
  494. for _, f := range masterConfig.RuleOfLine1 {
  495. faultLabel = f(data)
  496. if faultLabel != "" {
  497. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  498. subscribersTimes[i] = time.Now()
  499. break
  500. }
  501. }
  502. subscribersMutexes[i].Unlock()
  503. }
  504. subscribersTimeMutexes[i].Unlock()
  505. },
  506. })
  507. }
  508. // 19
  509. if topic == masterConfig.TopicOfLine2 && len(masterConfig.RuleOfLine2) > 0 {
  510. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  511. Node: commonConfig.RosNode,
  512. Topic: topic,
  513. Callback: func(data *nav_msgs.Path) {
  514. subscribersTimeMutexes[i].Lock()
  515. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  516. subscribersMutexes[i].Lock()
  517. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  518. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  519. var faultLabel string
  520. for _, f := range masterConfig.RuleOfLine2 {
  521. faultLabel = f(data)
  522. if faultLabel != "" {
  523. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  524. subscribersTimes[i] = time.Now()
  525. break
  526. }
  527. }
  528. subscribersMutexes[i].Unlock()
  529. }
  530. subscribersTimeMutexes[i].Unlock()
  531. },
  532. })
  533. }
  534. // 20
  535. if topic == masterConfig.TopicOfMapPolygon && len(masterConfig.RuleOfMapPolygon) > 0 {
  536. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  537. Node: commonConfig.RosNode,
  538. Topic: topic,
  539. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  540. subscribersTimeMutexes[i].Lock()
  541. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  542. subscribersMutexes[i].Lock()
  543. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  544. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  545. var faultLabel string
  546. for _, f := range masterConfig.RuleOfMapPolygon {
  547. faultLabel = f(data)
  548. if faultLabel != "" {
  549. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  550. subscribersTimes[i] = time.Now()
  551. break
  552. }
  553. }
  554. subscribersMutexes[i].Unlock()
  555. }
  556. subscribersTimeMutexes[i].Unlock()
  557. },
  558. })
  559. }
  560. // 21
  561. if topic == masterConfig.TopicOfObstacleDisplay && len(masterConfig.RuleOfObstacleDisplay) > 0 {
  562. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  563. Node: commonConfig.RosNode,
  564. Topic: topic,
  565. Callback: func(data *visualization_msgs.MarkerArray) {
  566. subscribersTimeMutexes[i].Lock()
  567. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  568. subscribersMutexes[i].Lock()
  569. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  570. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  571. var faultLabel string
  572. for _, f := range masterConfig.RuleOfObstacleDisplay {
  573. faultLabel = f(data)
  574. if faultLabel != "" {
  575. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  576. subscribersTimes[i] = time.Now()
  577. break
  578. }
  579. }
  580. subscribersMutexes[i].Unlock()
  581. }
  582. subscribersTimeMutexes[i].Unlock()
  583. },
  584. })
  585. }
  586. // 22
  587. if topic == masterConfig.TopicOfPjControlPub && len(masterConfig.RuleOfPjControlPub) > 0 {
  588. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  589. Node: commonConfig.RosNode,
  590. Topic: topic,
  591. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  592. subscribersTimeMutexes[i].Lock()
  593. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  594. subscribersMutexes[i].Lock()
  595. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  596. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  597. var faultLabel string
  598. for _, f := range masterConfig.RuleOfPjControlPub {
  599. faultLabel = f(data)
  600. if faultLabel != "" {
  601. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  602. subscribersTimes[i] = time.Now()
  603. break
  604. }
  605. }
  606. subscribersMutexes[i].Unlock()
  607. }
  608. subscribersTimeMutexes[i].Unlock()
  609. },
  610. })
  611. }
  612. // 23
  613. if topic == masterConfig.TopicOfPointsCluster && len(masterConfig.RuleOfPointsCluster) > 0 {
  614. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  615. Node: commonConfig.RosNode,
  616. Topic: topic,
  617. Callback: func(data *sensor_msgs.PointCloud2) {
  618. subscribersTimeMutexes[i].Lock()
  619. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  620. subscribersMutexes[i].Lock()
  621. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  622. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  623. var faultLabel string
  624. for _, f := range masterConfig.RuleOfPointsCluster {
  625. faultLabel = f(data)
  626. if faultLabel != "" {
  627. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  628. subscribersTimes[i] = time.Now()
  629. break
  630. }
  631. }
  632. subscribersMutexes[i].Unlock()
  633. }
  634. subscribersTimeMutexes[i].Unlock()
  635. },
  636. })
  637. }
  638. // 24
  639. if topic == masterConfig.TopicOfPointsConcat && len(masterConfig.RuleOfPointsConcat) > 0 {
  640. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  641. Node: commonConfig.RosNode,
  642. Topic: topic,
  643. Callback: func(data *sensor_msgs.PointCloud2) {
  644. subscribersTimeMutexes[i].Lock()
  645. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  646. subscribersMutexes[i].Lock()
  647. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  648. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  649. var faultLabel string
  650. for _, f := range masterConfig.RuleOfPointsConcat {
  651. faultLabel = f(data)
  652. if faultLabel != "" {
  653. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  654. subscribersTimes[i] = time.Now()
  655. break
  656. }
  657. }
  658. subscribersMutexes[i].Unlock()
  659. }
  660. subscribersTimeMutexes[i].Unlock()
  661. },
  662. })
  663. }
  664. // 25
  665. if topic == masterConfig.TopicOfReferenceDisplay && len(masterConfig.RuleOfReferenceDisplay) > 0 {
  666. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  667. Node: commonConfig.RosNode,
  668. Topic: topic,
  669. Callback: func(data *nav_msgs.Path) {
  670. subscribersTimeMutexes[i].Lock()
  671. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  672. subscribersMutexes[i].Lock()
  673. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  674. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  675. var faultLabel string
  676. for _, f := range masterConfig.RuleOfReferenceDisplay {
  677. faultLabel = f(data)
  678. if faultLabel != "" {
  679. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  680. subscribersTimes[i] = time.Now()
  681. break
  682. }
  683. }
  684. subscribersMutexes[i].Unlock()
  685. }
  686. subscribersTimeMutexes[i].Unlock()
  687. },
  688. })
  689. }
  690. // 26
  691. if topic == masterConfig.TopicOfReferenceTrajectory && len(masterConfig.RuleOfReferenceTrajectory) > 0 {
  692. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  693. Node: commonConfig.RosNode,
  694. Topic: topic,
  695. Callback: func(data *pjisuv_msgs.Trajectory) {
  696. subscribersTimeMutexes[i].Lock()
  697. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  698. subscribersMutexes[i].Lock()
  699. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  700. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  701. var faultLabel string
  702. for _, f := range masterConfig.RuleOfReferenceTrajectory {
  703. faultLabel = f(data)
  704. if faultLabel != "" {
  705. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  706. subscribersTimes[i] = time.Now()
  707. break
  708. }
  709. }
  710. subscribersMutexes[i].Unlock()
  711. }
  712. subscribersTimeMutexes[i].Unlock()
  713. },
  714. })
  715. }
  716. // 27
  717. if topic == masterConfig.TopicOfRoiPoints && len(masterConfig.RuleOfRoiPoints) > 0 {
  718. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  719. Node: commonConfig.RosNode,
  720. Topic: topic,
  721. Callback: func(data *sensor_msgs.PointCloud2) {
  722. subscribersTimeMutexes[i].Lock()
  723. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  724. subscribersMutexes[i].Lock()
  725. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  726. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  727. var faultLabel string
  728. for _, f := range masterConfig.RuleOfRoiPoints {
  729. faultLabel = f(data)
  730. if faultLabel != "" {
  731. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  732. subscribersTimes[i] = time.Now()
  733. break
  734. }
  735. }
  736. subscribersMutexes[i].Unlock()
  737. }
  738. subscribersTimeMutexes[i].Unlock()
  739. },
  740. })
  741. }
  742. // 28
  743. if topic == masterConfig.TopicOfRoiPolygon && len(masterConfig.RuleOfRoiPolygon) > 0 {
  744. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  745. Node: commonConfig.RosNode,
  746. Topic: topic,
  747. Callback: func(data *nav_msgs.Path) {
  748. subscribersTimeMutexes[i].Lock()
  749. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  750. subscribersMutexes[i].Lock()
  751. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  752. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  753. var faultLabel string
  754. for _, f := range masterConfig.RuleOfRoiPolygon {
  755. faultLabel = f(data)
  756. if faultLabel != "" {
  757. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  758. subscribersTimes[i] = time.Now()
  759. break
  760. }
  761. }
  762. subscribersMutexes[i].Unlock()
  763. }
  764. subscribersTimeMutexes[i].Unlock()
  765. },
  766. })
  767. }
  768. // 29
  769. if topic == masterConfig.TopicOfTf && len(masterConfig.RuleOfTf) > 0 {
  770. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  771. Node: commonConfig.RosNode,
  772. Topic: topic,
  773. Callback: func(data *tf2_msgs.TFMessage) {
  774. subscribersTimeMutexes[i].Lock()
  775. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  776. subscribersMutexes[i].Lock()
  777. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  778. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  779. var faultLabel string
  780. for _, f := range masterConfig.RuleOfTf {
  781. faultLabel = f(data)
  782. if faultLabel != "" {
  783. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  784. subscribersTimes[i] = time.Now()
  785. break
  786. }
  787. }
  788. subscribersMutexes[i].Unlock()
  789. }
  790. subscribersTimeMutexes[i].Unlock()
  791. },
  792. })
  793. }
  794. // 30
  795. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  796. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  797. Node: commonConfig.RosNode,
  798. Topic: topic,
  799. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  800. subscribersTimeMutexes[i].Lock()
  801. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  802. subscribersMutexes[i].Lock()
  803. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  804. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  805. var faultLabel string
  806. for _, f := range masterConfig.RuleOfTpperception {
  807. faultLabel = f(data, velocityX, velocityY, yaw)
  808. if faultLabel != "" {
  809. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  810. break
  811. }
  812. }
  813. subscribersMutexes[i].Unlock()
  814. }
  815. subscribersTimeMutexes[i].Unlock()
  816. },
  817. })
  818. }
  819. // 31
  820. if topic == masterConfig.TopicOfTpperceptionVis && len(masterConfig.RuleOfTpperceptionVis) > 0 {
  821. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  822. Node: commonConfig.RosNode,
  823. Topic: topic,
  824. Callback: func(data *visualization_msgs.MarkerArray) {
  825. subscribersTimeMutexes[i].Lock()
  826. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  827. subscribersMutexes[i].Lock()
  828. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  829. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  830. var faultLabel string
  831. for _, f := range masterConfig.RuleOfTpperceptionVis {
  832. faultLabel = f(data)
  833. if faultLabel != "" {
  834. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  835. subscribersTimes[i] = time.Now()
  836. break
  837. }
  838. }
  839. subscribersMutexes[i].Unlock()
  840. }
  841. subscribersTimeMutexes[i].Unlock()
  842. },
  843. })
  844. }
  845. // 32
  846. if topic == masterConfig.TopicOfTprouteplan && len(masterConfig.RuleOfTprouteplan) > 0 {
  847. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  848. Node: commonConfig.RosNode,
  849. Topic: topic,
  850. Callback: func(data *pjisuv_msgs.RoutePlan) {
  851. subscribersTimeMutexes[i].Lock()
  852. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  853. subscribersMutexes[i].Lock()
  854. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  855. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  856. var faultLabel string
  857. for _, f := range masterConfig.RuleOfTprouteplan {
  858. faultLabel = f(data)
  859. if faultLabel != "" {
  860. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  861. subscribersTimes[i] = time.Now()
  862. break
  863. }
  864. }
  865. subscribersMutexes[i].Unlock()
  866. }
  867. subscribersTimeMutexes[i].Unlock()
  868. },
  869. })
  870. }
  871. // 33
  872. if topic == masterConfig.TopicOfTrajectoryDisplay && len(masterConfig.RuleOfTrajectoryDisplay) > 0 {
  873. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  874. Node: commonConfig.RosNode,
  875. Topic: topic,
  876. Callback: func(data *nav_msgs.Path) {
  877. subscribersTimeMutexes[i].Lock()
  878. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  879. subscribersMutexes[i].Lock()
  880. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  881. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  882. var faultLabel string
  883. for _, f := range masterConfig.RuleOfTrajectoryDisplay {
  884. faultLabel = f(data)
  885. if faultLabel != "" {
  886. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  887. subscribersTimes[i] = time.Now()
  888. break
  889. }
  890. }
  891. subscribersMutexes[i].Unlock()
  892. }
  893. subscribersTimeMutexes[i].Unlock()
  894. },
  895. })
  896. }
  897. // 34
  898. if topic == masterConfig.TopicOfUngroundCloudpoints && len(masterConfig.RuleOfUngroundCloudpoints) > 0 {
  899. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  900. Node: commonConfig.RosNode,
  901. Topic: topic,
  902. Callback: func(data *sensor_msgs.PointCloud2) {
  903. subscribersTimeMutexes[i].Lock()
  904. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  905. subscribersMutexes[i].Lock()
  906. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  907. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  908. var faultLabel string
  909. for _, f := range masterConfig.RuleOfUngroundCloudpoints {
  910. faultLabel = f(data)
  911. if faultLabel != "" {
  912. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  913. subscribersTimes[i] = time.Now()
  914. break
  915. }
  916. }
  917. subscribersMutexes[i].Unlock()
  918. }
  919. subscribersTimeMutexes[i].Unlock()
  920. },
  921. })
  922. }
  923. // 35
  924. if topic == masterConfig.TopicOfCameraImage && len(masterConfig.RuleOfCameraImage) > 0 {
  925. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  926. Node: commonConfig.RosNode,
  927. Topic: topic,
  928. Callback: func(data *sensor_msgs.Image) {
  929. subscribersTimeMutexes[i].Lock()
  930. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  931. subscribersMutexes[i].Lock()
  932. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  933. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  934. var faultLabel string
  935. for _, f := range masterConfig.RuleOfCameraImage {
  936. faultLabel = f(data)
  937. if faultLabel != "" {
  938. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  939. subscribersTimes[i] = time.Now()
  940. break
  941. }
  942. }
  943. subscribersMutexes[i].Unlock()
  944. }
  945. subscribersTimeMutexes[i].Unlock()
  946. },
  947. })
  948. }
  949. // 36
  950. if topic == masterConfig.TopicOfDataRead && len(masterConfig.RuleOfDataRead) > 0 {
  951. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  952. Node: commonConfig.RosNode,
  953. Topic: topic,
  954. Callback: func(data *pjisuv_msgs.Retrieval) {
  955. subscribersTimeMutexes[i].Lock()
  956. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  957. subscribersMutexes[i].Lock()
  958. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  959. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  960. var faultLabel string
  961. for _, f := range masterConfig.RuleOfDataRead {
  962. faultLabel = f(data)
  963. if faultLabel != "" {
  964. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  965. subscribersTimes[i] = time.Now()
  966. break
  967. }
  968. }
  969. subscribersMutexes[i].Unlock()
  970. }
  971. subscribersTimeMutexes[i].Unlock()
  972. },
  973. })
  974. }
  975. // 37
  976. if topic == masterConfig.TopicOfPjiGps && len(masterConfig.RuleOfPjiGps) > 0 {
  977. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  978. Node: commonConfig.RosNode,
  979. Topic: topic,
  980. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  981. subscribersTimeMutexes[i].Lock()
  982. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  983. subscribersMutexes[i].Lock()
  984. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  985. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  986. var faultLabel string
  987. for _, f := range masterConfig.RuleOfPjiGps {
  988. faultLabel = f(data)
  989. if faultLabel != "" {
  990. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  991. subscribersTimes[i] = time.Now()
  992. break
  993. }
  994. }
  995. subscribersMutexes[i].Unlock()
  996. }
  997. subscribersTimeMutexes[i].Unlock()
  998. },
  999. })
  1000. }
  1001. if err != nil {
  1002. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  1003. //TODO 如何回传日志
  1004. continue
  1005. }
  1006. }
  1007. select {
  1008. case signal := <-service.ChannelKillWindowProducer:
  1009. if signal == 1 {
  1010. commonConfig.RosNode.Close()
  1011. service.AddKillTimes("3")
  1012. return
  1013. }
  1014. }
  1015. }
  1016. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *entity.TimeWindow) {
  1017. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1018. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  1019. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  1020. newTimeWindow := entity.TimeWindow{
  1021. FaultTime: faultHappenTime,
  1022. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  1023. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  1024. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  1025. Labels: []string{faultLabel},
  1026. MasterTopics: masterTopics,
  1027. SlaveTopics: slaveTopics,
  1028. }
  1029. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1030. entity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1031. } else {
  1032. // 2-2 如果在旧故障窗口内
  1033. entity.TimeWindowProducerQueueMutex.RLock()
  1034. defer entity.TimeWindowProducerQueueMutex.RUnlock()
  1035. // 2-2-1 更新故障窗口end时间
  1036. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  1037. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1038. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  1039. lastTimeWindow.TimeWindowEnd = maxEnd
  1040. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  1041. } else {
  1042. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  1043. lastTimeWindow.TimeWindowEnd = expectEnd
  1044. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1045. }
  1046. }
  1047. // 2-2-2 更新label
  1048. labels := lastTimeWindow.Labels
  1049. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1050. // 2-2-3 更新 topic
  1051. sourceMasterTopics := lastTimeWindow.MasterTopics
  1052. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1053. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1054. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1055. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1056. }
  1057. }
  1058. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1059. // 获取所有需要采集的topic
  1060. var faultCodeTopics []string
  1061. for _, code := range commonConfig.CloudConfig.Triggers {
  1062. if code.Label == faultLabel {
  1063. faultCodeTopics = code.Topics
  1064. }
  1065. }
  1066. // 根据不同节点采集的topic进行分配采集
  1067. for _, acceptTopic := range faultCodeTopics {
  1068. for _, host := range commonConfig.CloudConfig.Hosts {
  1069. for _, topic := range host.Topics {
  1070. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1071. masterTopics = append(masterTopics, acceptTopic)
  1072. }
  1073. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1074. slaveTopics = append(slaveTopics, acceptTopic)
  1075. }
  1076. }
  1077. }
  1078. }
  1079. return masterTopics, slaveTopics
  1080. }