produce_window.go 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214
  1. package svc
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/package/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "github.com/bluenviron/goroslib/v2"
  11. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. m sync.RWMutex
  21. velocityX float64
  22. velocityY float64
  23. yaw float64
  24. )
  25. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  26. func PrepareTimeWindowProducerQueue() {
  27. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  28. var err error
  29. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  30. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  31. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  32. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  33. for i, topic := range commonConfig.SubscribeTopics {
  34. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  35. // 1
  36. if topic == masterConfig.TopicOfAmrPose && len(masterConfig.RuleOfAmrPose) > 0 {
  37. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  38. Node: commonConfig.RosNode,
  39. Topic: topic,
  40. Callback: func(data *visualization_msgs.MarkerArray) {
  41. if len(masterConfig.TopicOfAmrPose) == 0 {
  42. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  43. return
  44. }
  45. subscribersTimeMutexes[i].Lock()
  46. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  47. subscribersMutexes[i].Lock()
  48. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  49. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  50. var faultLabel string
  51. for _, f := range masterConfig.RuleOfAmrPose {
  52. faultLabel = f(data)
  53. if faultLabel != "" {
  54. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  55. subscribersTimes[i] = time.Now()
  56. break
  57. }
  58. }
  59. subscribersMutexes[i].Unlock()
  60. }
  61. subscribersTimeMutexes[i].Unlock()
  62. },
  63. })
  64. }
  65. // 2
  66. if topic == masterConfig.TopicOfBoundingBoxesFast && len(masterConfig.RuleOfBoundingBoxesFast) > 0 {
  67. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  68. Node: commonConfig.RosNode,
  69. Topic: topic,
  70. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  71. if len(masterConfig.TopicOfBoundingBoxesFast) == 0 {
  72. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  73. return
  74. }
  75. subscribersTimeMutexes[i].Lock()
  76. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  77. subscribersMutexes[i].Lock()
  78. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  79. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  80. var faultLabel string
  81. for _, f := range masterConfig.RuleOfBoundingBoxesFast {
  82. faultLabel = f(data)
  83. if faultLabel != "" {
  84. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  85. subscribersTimes[i] = time.Now()
  86. break
  87. }
  88. }
  89. subscribersMutexes[i].Unlock()
  90. }
  91. subscribersTimeMutexes[i].Unlock()
  92. },
  93. })
  94. }
  95. // 3
  96. if topic == masterConfig.TopicOfCameraFault && len(masterConfig.RuleOfCameraFault) > 0 {
  97. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  98. Node: commonConfig.RosNode,
  99. Topic: topic,
  100. Callback: func(data *pjisuv_msgs.FaultVec) {
  101. if len(masterConfig.TopicOfCameraFault) == 0 {
  102. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  103. return
  104. }
  105. subscribersTimeMutexes[i].Lock()
  106. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  107. subscribersMutexes[i].Lock()
  108. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  109. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  110. var faultLabel string
  111. for _, f := range masterConfig.RuleOfCameraFault {
  112. faultLabel = f(data)
  113. if faultLabel != "" {
  114. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  115. subscribersTimes[i] = time.Now()
  116. break
  117. }
  118. }
  119. subscribersMutexes[i].Unlock()
  120. }
  121. subscribersTimeMutexes[i].Unlock()
  122. },
  123. })
  124. }
  125. // 4
  126. if topic == masterConfig.TopicOfCanData && len(masterConfig.RuleOfCanData) > 0 {
  127. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  128. Node: commonConfig.RosNode,
  129. Topic: topic,
  130. Callback: func(data *pjisuv_msgs.Frame) {
  131. if len(masterConfig.TopicOfCanData) == 0 {
  132. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  133. return
  134. }
  135. subscribersTimeMutexes[i].Lock()
  136. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  137. subscribersMutexes[i].Lock()
  138. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  139. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  140. var faultLabel string
  141. for _, f := range masterConfig.RuleOfCanData {
  142. faultLabel = f(data)
  143. if faultLabel != "" {
  144. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  145. subscribersTimes[i] = time.Now()
  146. break
  147. }
  148. }
  149. subscribersMutexes[i].Unlock()
  150. }
  151. subscribersTimeMutexes[i].Unlock()
  152. },
  153. })
  154. }
  155. // 5
  156. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud && len(masterConfig.RuleOfCh128x1LslidarPointCloud) > 0 {
  157. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  158. Node: commonConfig.RosNode,
  159. Topic: topic,
  160. Callback: func(data *sensor_msgs.PointCloud2) {
  161. if len(masterConfig.TopicOfCh128x1LslidarPointCloud) == 0 {
  162. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  163. return
  164. }
  165. subscribersTimeMutexes[i].Lock()
  166. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  167. subscribersMutexes[i].Lock()
  168. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  169. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  170. var faultLabel string
  171. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud {
  172. faultLabel = f(data)
  173. if faultLabel != "" {
  174. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  175. subscribersTimes[i] = time.Now()
  176. break
  177. }
  178. }
  179. subscribersMutexes[i].Unlock()
  180. }
  181. subscribersTimeMutexes[i].Unlock()
  182. },
  183. })
  184. }
  185. // 6
  186. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud && len(masterConfig.RuleOfCh64wLLslidarPointCloud) > 0 {
  187. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  188. Node: commonConfig.RosNode,
  189. Topic: topic,
  190. Callback: func(data *sensor_msgs.PointCloud2) {
  191. if len(masterConfig.TopicOfCh64wLLslidarPointCloud) == 0 {
  192. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  193. return
  194. }
  195. subscribersTimeMutexes[i].Lock()
  196. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  197. subscribersMutexes[i].Lock()
  198. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  199. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  200. var faultLabel string
  201. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud {
  202. faultLabel = f(data)
  203. if faultLabel != "" {
  204. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  205. subscribersTimes[i] = time.Now()
  206. break
  207. }
  208. }
  209. subscribersMutexes[i].Unlock()
  210. }
  211. subscribersTimeMutexes[i].Unlock()
  212. },
  213. })
  214. }
  215. // 7
  216. if topic == masterConfig.TopicOfCh64wLScan && len(masterConfig.RuleOfCh64wLScan) > 0 {
  217. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  218. Node: commonConfig.RosNode,
  219. Topic: topic,
  220. Callback: func(data *sensor_msgs.LaserScan) {
  221. if len(masterConfig.TopicOfCh64wLScan) == 0 {
  222. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  223. return
  224. }
  225. subscribersTimeMutexes[i].Lock()
  226. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  227. subscribersMutexes[i].Lock()
  228. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  229. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  230. var faultLabel string
  231. for _, f := range masterConfig.RuleOfCh64wLScan {
  232. faultLabel = f(data)
  233. if faultLabel != "" {
  234. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  235. subscribersTimes[i] = time.Now()
  236. break
  237. }
  238. }
  239. subscribersMutexes[i].Unlock()
  240. }
  241. subscribersTimeMutexes[i].Unlock()
  242. },
  243. })
  244. }
  245. // 8
  246. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud && len(masterConfig.RuleOfCh64wRLslidarPointCloud) > 0 {
  247. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  248. Node: commonConfig.RosNode,
  249. Topic: topic,
  250. Callback: func(data *sensor_msgs.PointCloud2) {
  251. if len(masterConfig.TopicOfCh64wRLslidarPointCloud) == 0 {
  252. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  253. return
  254. }
  255. subscribersTimeMutexes[i].Lock()
  256. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  257. subscribersMutexes[i].Lock()
  258. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  259. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  260. var faultLabel string
  261. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud {
  262. faultLabel = f(data)
  263. if faultLabel != "" {
  264. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  265. subscribersTimes[i] = time.Now()
  266. break
  267. }
  268. }
  269. subscribersMutexes[i].Unlock()
  270. }
  271. subscribersTimeMutexes[i].Unlock()
  272. },
  273. })
  274. }
  275. // 9
  276. if topic == masterConfig.TopicOfCh64wRScan && len(masterConfig.RuleOfCh64wRScan) > 0 {
  277. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  278. Node: commonConfig.RosNode,
  279. Topic: topic,
  280. Callback: func(data *sensor_msgs.LaserScan) {
  281. if len(masterConfig.TopicOfCh64wRScan) == 0 {
  282. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  283. return
  284. }
  285. subscribersTimeMutexes[i].Lock()
  286. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  287. subscribersMutexes[i].Lock()
  288. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  289. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  290. var faultLabel string
  291. for _, f := range masterConfig.RuleOfCh64wRScan {
  292. faultLabel = f(data)
  293. if faultLabel != "" {
  294. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  295. subscribersTimes[i] = time.Now()
  296. break
  297. }
  298. }
  299. subscribersMutexes[i].Unlock()
  300. }
  301. subscribersTimeMutexes[i].Unlock()
  302. },
  303. })
  304. }
  305. // 10
  306. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects && len(masterConfig.RuleOfCicvLidarclusterMovingObjects) > 0 {
  307. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  308. Node: commonConfig.RosNode,
  309. Topic: topic,
  310. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  311. if len(masterConfig.TopicOfCicvLidarclusterMovingObjects) == 0 {
  312. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  313. return
  314. }
  315. subscribersTimeMutexes[i].Lock()
  316. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  317. subscribersMutexes[i].Lock()
  318. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  319. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  320. var faultLabel string
  321. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects {
  322. faultLabel = f(data)
  323. if faultLabel != "" {
  324. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  325. subscribersTimes[i] = time.Now()
  326. break
  327. }
  328. }
  329. subscribersMutexes[i].Unlock()
  330. }
  331. subscribersTimeMutexes[i].Unlock()
  332. },
  333. })
  334. }
  335. // 11
  336. if topic == masterConfig.TopicOfCicvAmrTrajectory && len(masterConfig.RuleOfCicvAmrTrajectory) > 0 {
  337. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  338. Node: commonConfig.RosNode,
  339. Topic: topic,
  340. Callback: func(data *pjisuv_msgs.Trajectory) {
  341. if len(masterConfig.TopicOfCicvAmrTrajectory) == 0 {
  342. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  343. return
  344. }
  345. subscribersTimeMutexes[i].Lock()
  346. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  347. subscribersMutexes[i].Lock()
  348. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  349. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  350. var faultLabel string
  351. for _, f := range masterConfig.RuleOfCicvAmrTrajectory {
  352. faultLabel = f(data)
  353. if faultLabel != "" {
  354. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  355. subscribersTimes[i] = time.Now()
  356. break
  357. }
  358. }
  359. subscribersMutexes[i].Unlock()
  360. }
  361. subscribersTimeMutexes[i].Unlock()
  362. },
  363. })
  364. }
  365. // 12
  366. if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
  367. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  368. Node: commonConfig.RosNode,
  369. Topic: topic,
  370. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  371. // 更新共享变量
  372. m.RLock()
  373. velocityX = data.VelocityX
  374. velocityY = data.VelocityY
  375. yaw = data.Yaw
  376. m.RUnlock()
  377. if len(masterConfig.TopicOfCicvLocation) == 0 {
  378. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  379. return
  380. }
  381. subscribersTimeMutexes[i].Lock()
  382. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  383. subscribersMutexes[i].Lock()
  384. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  385. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  386. var faultLabel string
  387. for _, f := range masterConfig.RuleOfCicvLocation {
  388. faultLabel = f(data)
  389. if faultLabel != "" {
  390. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  391. break
  392. }
  393. }
  394. subscribersMutexes[i].Unlock()
  395. }
  396. subscribersTimeMutexes[i].Unlock()
  397. },
  398. })
  399. }
  400. // 13
  401. if topic == masterConfig.TopicOfCloudClusters && len(masterConfig.RuleOfCloudClusters) > 0 {
  402. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  403. Node: commonConfig.RosNode,
  404. Topic: topic,
  405. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  406. if len(masterConfig.TopicOfCloudClusters) == 0 {
  407. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  408. return
  409. }
  410. subscribersTimeMutexes[i].Lock()
  411. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  412. subscribersMutexes[i].Lock()
  413. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  414. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  415. var faultLabel string
  416. for _, f := range masterConfig.RuleOfCloudClusters {
  417. faultLabel = f(data)
  418. if faultLabel != "" {
  419. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  420. subscribersTimes[i] = time.Now()
  421. break
  422. }
  423. }
  424. subscribersMutexes[i].Unlock()
  425. }
  426. subscribersTimeMutexes[i].Unlock()
  427. },
  428. })
  429. }
  430. // 14
  431. if topic == masterConfig.TopicOfHeartbeatInfo && len(masterConfig.RuleOfHeartbeatInfo) > 0 {
  432. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  433. Node: commonConfig.RosNode,
  434. Topic: topic,
  435. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  436. if len(masterConfig.TopicOfHeartbeatInfo) == 0 {
  437. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  438. return
  439. }
  440. subscribersTimeMutexes[i].Lock()
  441. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  442. subscribersMutexes[i].Lock()
  443. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  444. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  445. var faultLabel string
  446. for _, f := range masterConfig.RuleOfHeartbeatInfo {
  447. faultLabel = f(data)
  448. if faultLabel != "" {
  449. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  450. subscribersTimes[i] = time.Now()
  451. break
  452. }
  453. }
  454. subscribersMutexes[i].Unlock()
  455. }
  456. subscribersTimeMutexes[i].Unlock()
  457. },
  458. })
  459. }
  460. // 15
  461. if topic == masterConfig.TopicOfLidarPretreatmentCost && len(masterConfig.RuleOfLidarPretreatmentCost) > 0 {
  462. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  463. Node: commonConfig.RosNode,
  464. Topic: topic,
  465. Callback: func(data *geometry_msgs.Vector3Stamped) {
  466. if len(masterConfig.TopicOfLidarPretreatmentCost) == 0 {
  467. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  468. return
  469. }
  470. subscribersTimeMutexes[i].Lock()
  471. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  472. subscribersMutexes[i].Lock()
  473. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  474. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  475. var faultLabel string
  476. for _, f := range masterConfig.RuleOfLidarPretreatmentCost {
  477. faultLabel = f(data)
  478. if faultLabel != "" {
  479. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  480. subscribersTimes[i] = time.Now()
  481. break
  482. }
  483. }
  484. subscribersMutexes[i].Unlock()
  485. }
  486. subscribersTimeMutexes[i].Unlock()
  487. },
  488. })
  489. }
  490. // 16
  491. if topic == masterConfig.TopicOfLidarPretreatmentOdometry && len(masterConfig.RuleOfLidarPretreatmentOdometry) > 0 {
  492. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  493. Node: commonConfig.RosNode,
  494. Topic: topic,
  495. Callback: func(data *nav_msgs.Odometry) {
  496. if len(masterConfig.TopicOfLidarPretreatmentOdometry) == 0 {
  497. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  498. return
  499. }
  500. subscribersTimeMutexes[i].Lock()
  501. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  502. subscribersMutexes[i].Lock()
  503. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  504. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  505. var faultLabel string
  506. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry {
  507. faultLabel = f(data)
  508. if faultLabel != "" {
  509. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  510. subscribersTimes[i] = time.Now()
  511. break
  512. }
  513. }
  514. subscribersMutexes[i].Unlock()
  515. }
  516. subscribersTimeMutexes[i].Unlock()
  517. },
  518. })
  519. }
  520. // 17
  521. if topic == masterConfig.TopicOfLidarRoi && len(masterConfig.RuleOfLidarRoi) > 0 {
  522. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  523. Node: commonConfig.RosNode,
  524. Topic: topic,
  525. Callback: func(data *geometry_msgs.PolygonStamped) {
  526. if len(masterConfig.TopicOfLidarRoi) == 0 {
  527. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  528. return
  529. }
  530. subscribersTimeMutexes[i].Lock()
  531. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  532. subscribersMutexes[i].Lock()
  533. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  534. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  535. var faultLabel string
  536. for _, f := range masterConfig.RuleOfLidarRoi {
  537. faultLabel = f(data)
  538. if faultLabel != "" {
  539. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  540. subscribersTimes[i] = time.Now()
  541. break
  542. }
  543. }
  544. subscribersMutexes[i].Unlock()
  545. }
  546. subscribersTimeMutexes[i].Unlock()
  547. },
  548. })
  549. }
  550. // 18
  551. if topic == masterConfig.TopicOfLine1 && len(masterConfig.RuleOfLine1) > 0 {
  552. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  553. Node: commonConfig.RosNode,
  554. Topic: topic,
  555. Callback: func(data *nav_msgs.Path) {
  556. if len(masterConfig.TopicOfLine1) == 0 {
  557. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  558. return
  559. }
  560. subscribersTimeMutexes[i].Lock()
  561. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  562. subscribersMutexes[i].Lock()
  563. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  564. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  565. var faultLabel string
  566. for _, f := range masterConfig.RuleOfLine1 {
  567. faultLabel = f(data)
  568. if faultLabel != "" {
  569. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  570. subscribersTimes[i] = time.Now()
  571. break
  572. }
  573. }
  574. subscribersMutexes[i].Unlock()
  575. }
  576. subscribersTimeMutexes[i].Unlock()
  577. },
  578. })
  579. }
  580. // 19
  581. if topic == masterConfig.TopicOfLine2 && len(masterConfig.RuleOfLine2) > 0 {
  582. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  583. Node: commonConfig.RosNode,
  584. Topic: topic,
  585. Callback: func(data *nav_msgs.Path) {
  586. if len(masterConfig.TopicOfLine2) == 0 {
  587. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  588. return
  589. }
  590. subscribersTimeMutexes[i].Lock()
  591. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  592. subscribersMutexes[i].Lock()
  593. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  594. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  595. var faultLabel string
  596. for _, f := range masterConfig.RuleOfLine2 {
  597. faultLabel = f(data)
  598. if faultLabel != "" {
  599. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  600. subscribersTimes[i] = time.Now()
  601. break
  602. }
  603. }
  604. subscribersMutexes[i].Unlock()
  605. }
  606. subscribersTimeMutexes[i].Unlock()
  607. },
  608. })
  609. }
  610. // 20
  611. if topic == masterConfig.TopicOfMapPolygon && len(masterConfig.RuleOfMapPolygon) > 0 {
  612. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  613. Node: commonConfig.RosNode,
  614. Topic: topic,
  615. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  616. if len(masterConfig.TopicOfMapPolygon) == 0 {
  617. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  618. return
  619. }
  620. subscribersTimeMutexes[i].Lock()
  621. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  622. subscribersMutexes[i].Lock()
  623. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  624. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  625. var faultLabel string
  626. for _, f := range masterConfig.RuleOfMapPolygon {
  627. faultLabel = f(data)
  628. if faultLabel != "" {
  629. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  630. subscribersTimes[i] = time.Now()
  631. break
  632. }
  633. }
  634. subscribersMutexes[i].Unlock()
  635. }
  636. subscribersTimeMutexes[i].Unlock()
  637. },
  638. })
  639. }
  640. // 21
  641. if topic == masterConfig.TopicOfObstacleDisplay && len(masterConfig.RuleOfObstacleDisplay) > 0 {
  642. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  643. Node: commonConfig.RosNode,
  644. Topic: topic,
  645. Callback: func(data *visualization_msgs.MarkerArray) {
  646. if len(masterConfig.TopicOfObstacleDisplay) == 0 {
  647. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  648. return
  649. }
  650. subscribersTimeMutexes[i].Lock()
  651. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  652. subscribersMutexes[i].Lock()
  653. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  654. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  655. var faultLabel string
  656. for _, f := range masterConfig.RuleOfObstacleDisplay {
  657. faultLabel = f(data)
  658. if faultLabel != "" {
  659. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  660. subscribersTimes[i] = time.Now()
  661. break
  662. }
  663. }
  664. subscribersMutexes[i].Unlock()
  665. }
  666. subscribersTimeMutexes[i].Unlock()
  667. },
  668. })
  669. }
  670. // 22
  671. if topic == masterConfig.TopicOfPjControlPub && len(masterConfig.RuleOfPjControlPub) > 0 {
  672. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  673. Node: commonConfig.RosNode,
  674. Topic: topic,
  675. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  676. if len(masterConfig.TopicOfPjControlPub) == 0 {
  677. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  678. return
  679. }
  680. subscribersTimeMutexes[i].Lock()
  681. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  682. subscribersMutexes[i].Lock()
  683. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  684. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  685. var faultLabel string
  686. for _, f := range masterConfig.RuleOfPjControlPub {
  687. faultLabel = f(data)
  688. if faultLabel != "" {
  689. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  690. subscribersTimes[i] = time.Now()
  691. break
  692. }
  693. }
  694. subscribersMutexes[i].Unlock()
  695. }
  696. subscribersTimeMutexes[i].Unlock()
  697. },
  698. })
  699. }
  700. // 23
  701. if topic == masterConfig.TopicOfPointsCluster && len(masterConfig.RuleOfPointsCluster) > 0 {
  702. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  703. Node: commonConfig.RosNode,
  704. Topic: topic,
  705. Callback: func(data *sensor_msgs.PointCloud2) {
  706. if len(masterConfig.TopicOfPointsCluster) == 0 {
  707. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  708. return
  709. }
  710. subscribersTimeMutexes[i].Lock()
  711. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  712. subscribersMutexes[i].Lock()
  713. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  714. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  715. var faultLabel string
  716. for _, f := range masterConfig.RuleOfPointsCluster {
  717. faultLabel = f(data)
  718. if faultLabel != "" {
  719. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  720. subscribersTimes[i] = time.Now()
  721. break
  722. }
  723. }
  724. subscribersMutexes[i].Unlock()
  725. }
  726. subscribersTimeMutexes[i].Unlock()
  727. },
  728. })
  729. }
  730. // 24
  731. if topic == masterConfig.TopicOfPointsConcat && len(masterConfig.RuleOfPointsConcat) > 0 {
  732. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  733. Node: commonConfig.RosNode,
  734. Topic: topic,
  735. Callback: func(data *sensor_msgs.PointCloud2) {
  736. if len(masterConfig.TopicOfPointsConcat) == 0 {
  737. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  738. return
  739. }
  740. subscribersTimeMutexes[i].Lock()
  741. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  742. subscribersMutexes[i].Lock()
  743. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  744. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  745. var faultLabel string
  746. for _, f := range masterConfig.RuleOfPointsConcat {
  747. faultLabel = f(data)
  748. if faultLabel != "" {
  749. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  750. subscribersTimes[i] = time.Now()
  751. break
  752. }
  753. }
  754. subscribersMutexes[i].Unlock()
  755. }
  756. subscribersTimeMutexes[i].Unlock()
  757. },
  758. })
  759. }
  760. // 25
  761. if topic == masterConfig.TopicOfReferenceDisplay && len(masterConfig.RuleOfReferenceDisplay) > 0 {
  762. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  763. Node: commonConfig.RosNode,
  764. Topic: topic,
  765. Callback: func(data *nav_msgs.Path) {
  766. if len(masterConfig.TopicOfReferenceDisplay) == 0 {
  767. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  768. return
  769. }
  770. subscribersTimeMutexes[i].Lock()
  771. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  772. subscribersMutexes[i].Lock()
  773. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  774. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  775. var faultLabel string
  776. for _, f := range masterConfig.RuleOfReferenceDisplay {
  777. faultLabel = f(data)
  778. if faultLabel != "" {
  779. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  780. subscribersTimes[i] = time.Now()
  781. break
  782. }
  783. }
  784. subscribersMutexes[i].Unlock()
  785. }
  786. subscribersTimeMutexes[i].Unlock()
  787. },
  788. })
  789. }
  790. // 26
  791. if topic == masterConfig.TopicOfReferenceTrajectory && len(masterConfig.RuleOfReferenceTrajectory) > 0 {
  792. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  793. Node: commonConfig.RosNode,
  794. Topic: topic,
  795. Callback: func(data *pjisuv_msgs.Trajectory) {
  796. if len(masterConfig.TopicOfReferenceTrajectory) == 0 {
  797. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  798. return
  799. }
  800. subscribersTimeMutexes[i].Lock()
  801. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  802. subscribersMutexes[i].Lock()
  803. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  804. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  805. var faultLabel string
  806. for _, f := range masterConfig.RuleOfReferenceTrajectory {
  807. faultLabel = f(data)
  808. if faultLabel != "" {
  809. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  810. subscribersTimes[i] = time.Now()
  811. break
  812. }
  813. }
  814. subscribersMutexes[i].Unlock()
  815. }
  816. subscribersTimeMutexes[i].Unlock()
  817. },
  818. })
  819. }
  820. // 27
  821. if topic == masterConfig.TopicOfRoiPoints && len(masterConfig.RuleOfRoiPoints) > 0 {
  822. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  823. Node: commonConfig.RosNode,
  824. Topic: topic,
  825. Callback: func(data *sensor_msgs.PointCloud2) {
  826. if len(masterConfig.TopicOfRoiPoints) == 0 {
  827. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  828. return
  829. }
  830. subscribersTimeMutexes[i].Lock()
  831. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  832. subscribersMutexes[i].Lock()
  833. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  834. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  835. var faultLabel string
  836. for _, f := range masterConfig.RuleOfRoiPoints {
  837. faultLabel = f(data)
  838. if faultLabel != "" {
  839. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  840. subscribersTimes[i] = time.Now()
  841. break
  842. }
  843. }
  844. subscribersMutexes[i].Unlock()
  845. }
  846. subscribersTimeMutexes[i].Unlock()
  847. },
  848. })
  849. }
  850. // 28
  851. if topic == masterConfig.TopicOfRoiPolygon && len(masterConfig.RuleOfRoiPolygon) > 0 {
  852. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  853. Node: commonConfig.RosNode,
  854. Topic: topic,
  855. Callback: func(data *nav_msgs.Path) {
  856. if len(masterConfig.TopicOfRoiPolygon) == 0 {
  857. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  858. return
  859. }
  860. subscribersTimeMutexes[i].Lock()
  861. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  862. subscribersMutexes[i].Lock()
  863. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  864. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  865. var faultLabel string
  866. for _, f := range masterConfig.RuleOfRoiPolygon {
  867. faultLabel = f(data)
  868. if faultLabel != "" {
  869. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  870. subscribersTimes[i] = time.Now()
  871. break
  872. }
  873. }
  874. subscribersMutexes[i].Unlock()
  875. }
  876. subscribersTimeMutexes[i].Unlock()
  877. },
  878. })
  879. }
  880. // 29
  881. if topic == masterConfig.TopicOfTf && len(masterConfig.RuleOfTf) > 0 {
  882. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  883. Node: commonConfig.RosNode,
  884. Topic: topic,
  885. Callback: func(data *tf2_msgs.TFMessage) {
  886. if len(masterConfig.TopicOfTf) == 0 {
  887. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  888. return
  889. }
  890. subscribersTimeMutexes[i].Lock()
  891. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  892. subscribersMutexes[i].Lock()
  893. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  894. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  895. var faultLabel string
  896. for _, f := range masterConfig.RuleOfTf {
  897. faultLabel = f(data)
  898. if faultLabel != "" {
  899. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  900. subscribersTimes[i] = time.Now()
  901. break
  902. }
  903. }
  904. subscribersMutexes[i].Unlock()
  905. }
  906. subscribersTimeMutexes[i].Unlock()
  907. },
  908. })
  909. }
  910. // 30
  911. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  912. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  913. Node: commonConfig.RosNode,
  914. Topic: topic,
  915. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  916. if len(masterConfig.TopicOfTpperception) == 0 {
  917. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  918. return
  919. }
  920. subscribersTimeMutexes[i].Lock()
  921. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  922. subscribersMutexes[i].Lock()
  923. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  924. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  925. var faultLabel string
  926. for _, f := range masterConfig.RuleOfTpperception {
  927. faultLabel = f(data, velocityX, velocityY, yaw)
  928. if faultLabel != "" {
  929. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  930. break
  931. }
  932. }
  933. subscribersMutexes[i].Unlock()
  934. }
  935. subscribersTimeMutexes[i].Unlock()
  936. },
  937. })
  938. }
  939. // 31
  940. if topic == masterConfig.TopicOfTpperceptionVis && len(masterConfig.RuleOfTpperceptionVis) > 0 {
  941. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  942. Node: commonConfig.RosNode,
  943. Topic: topic,
  944. Callback: func(data *visualization_msgs.MarkerArray) {
  945. if len(masterConfig.TopicOfTpperceptionVis) == 0 {
  946. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  947. return
  948. }
  949. subscribersTimeMutexes[i].Lock()
  950. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  951. subscribersMutexes[i].Lock()
  952. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  953. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  954. var faultLabel string
  955. for _, f := range masterConfig.RuleOfTpperceptionVis {
  956. faultLabel = f(data)
  957. if faultLabel != "" {
  958. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  959. subscribersTimes[i] = time.Now()
  960. break
  961. }
  962. }
  963. subscribersMutexes[i].Unlock()
  964. }
  965. subscribersTimeMutexes[i].Unlock()
  966. },
  967. })
  968. }
  969. // 32
  970. if topic == masterConfig.TopicOfTprouteplan && len(masterConfig.RuleOfTprouteplan) > 0 {
  971. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  972. Node: commonConfig.RosNode,
  973. Topic: topic,
  974. Callback: func(data *pjisuv_msgs.RoutePlan) {
  975. if len(masterConfig.TopicOfTprouteplan) == 0 {
  976. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  977. return
  978. }
  979. subscribersTimeMutexes[i].Lock()
  980. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  981. subscribersMutexes[i].Lock()
  982. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  983. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  984. var faultLabel string
  985. for _, f := range masterConfig.RuleOfTprouteplan {
  986. faultLabel = f(data)
  987. if faultLabel != "" {
  988. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  989. subscribersTimes[i] = time.Now()
  990. break
  991. }
  992. }
  993. subscribersMutexes[i].Unlock()
  994. }
  995. subscribersTimeMutexes[i].Unlock()
  996. },
  997. })
  998. }
  999. // 33
  1000. if topic == masterConfig.TopicOfTrajectoryDisplay && len(masterConfig.RuleOfTrajectoryDisplay) > 0 {
  1001. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1002. Node: commonConfig.RosNode,
  1003. Topic: topic,
  1004. Callback: func(data *nav_msgs.Path) {
  1005. if len(masterConfig.TopicOfTrajectoryDisplay) == 0 {
  1006. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  1007. return
  1008. }
  1009. subscribersTimeMutexes[i].Lock()
  1010. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1011. subscribersMutexes[i].Lock()
  1012. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1013. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  1014. var faultLabel string
  1015. for _, f := range masterConfig.RuleOfTrajectoryDisplay {
  1016. faultLabel = f(data)
  1017. if faultLabel != "" {
  1018. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1019. subscribersTimes[i] = time.Now()
  1020. break
  1021. }
  1022. }
  1023. subscribersMutexes[i].Unlock()
  1024. }
  1025. subscribersTimeMutexes[i].Unlock()
  1026. },
  1027. })
  1028. }
  1029. // 34
  1030. if topic == masterConfig.TopicOfUngroundCloudpoints && len(masterConfig.RuleOfUngroundCloudpoints) > 0 {
  1031. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1032. Node: commonConfig.RosNode,
  1033. Topic: topic,
  1034. Callback: func(data *sensor_msgs.PointCloud2) {
  1035. if len(masterConfig.TopicOfUngroundCloudpoints) == 0 {
  1036. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  1037. return
  1038. }
  1039. subscribersTimeMutexes[i].Lock()
  1040. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1041. subscribersMutexes[i].Lock()
  1042. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1043. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  1044. var faultLabel string
  1045. for _, f := range masterConfig.RuleOfUngroundCloudpoints {
  1046. faultLabel = f(data)
  1047. if faultLabel != "" {
  1048. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1049. subscribersTimes[i] = time.Now()
  1050. break
  1051. }
  1052. }
  1053. subscribersMutexes[i].Unlock()
  1054. }
  1055. subscribersTimeMutexes[i].Unlock()
  1056. },
  1057. })
  1058. }
  1059. // 35
  1060. if topic == masterConfig.TopicOfDataRead && len(masterConfig.RuleOfDataRead) > 0 {
  1061. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1062. Node: commonConfig.RosNode,
  1063. Topic: topic,
  1064. Callback: func(data *pjisuv_msgs.Retrieval) {
  1065. if len(masterConfig.TopicOfDataRead) == 0 {
  1066. c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
  1067. return
  1068. }
  1069. subscribersTimeMutexes[i].Lock()
  1070. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1071. subscribersMutexes[i].Lock()
  1072. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1073. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  1074. var faultLabel string
  1075. for _, f := range masterConfig.RuleOfDataRead {
  1076. faultLabel = f(data)
  1077. if faultLabel != "" {
  1078. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1079. subscribersTimes[i] = time.Now()
  1080. break
  1081. }
  1082. }
  1083. subscribersMutexes[i].Unlock()
  1084. }
  1085. subscribersTimeMutexes[i].Unlock()
  1086. },
  1087. })
  1088. }
  1089. if err != nil {
  1090. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  1091. //TODO 如何回传日志
  1092. continue
  1093. }
  1094. }
  1095. select {
  1096. case signal := <-service.ChannelKillWindowProducer:
  1097. if signal == 1 {
  1098. commonConfig.RosNode.Close()
  1099. service.AddKillTimes("3")
  1100. return
  1101. }
  1102. }
  1103. }
  1104. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *entity.TimeWindow) {
  1105. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1106. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  1107. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  1108. newTimeWindow := entity.TimeWindow{
  1109. FaultTime: faultHappenTime,
  1110. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  1111. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  1112. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  1113. Labels: []string{faultLabel},
  1114. MasterTopics: masterTopics,
  1115. SlaveTopics: slaveTopics,
  1116. }
  1117. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1118. entity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1119. } else {
  1120. // 2-2 如果在旧故障窗口内
  1121. entity.TimeWindowProducerQueueMutex.RLock()
  1122. defer entity.TimeWindowProducerQueueMutex.RUnlock()
  1123. // 2-2-1 更新故障窗口end时间
  1124. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  1125. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1126. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  1127. lastTimeWindow.TimeWindowEnd = maxEnd
  1128. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  1129. } else {
  1130. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  1131. lastTimeWindow.TimeWindowEnd = expectEnd
  1132. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1133. }
  1134. }
  1135. // 2-2-2 更新label
  1136. labels := lastTimeWindow.Labels
  1137. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1138. // 2-2-3 更新 topic
  1139. sourceMasterTopics := lastTimeWindow.MasterTopics
  1140. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1141. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1142. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1143. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1144. }
  1145. }
  1146. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1147. // 获取所有需要采集的topic
  1148. var faultCodeTopics []string
  1149. for _, code := range commonConfig.CloudConfig.Triggers {
  1150. if code.Label == faultLabel {
  1151. faultCodeTopics = code.Topics
  1152. }
  1153. }
  1154. // 根据不同节点采集的topic进行分配采集
  1155. for _, acceptTopic := range faultCodeTopics {
  1156. for _, host := range commonConfig.CloudConfig.Hosts {
  1157. for _, topic := range host.Topics {
  1158. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1159. masterTopics = append(masterTopics, acceptTopic)
  1160. }
  1161. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1162. slaveTopics = append(slaveTopics, acceptTopic)
  1163. }
  1164. }
  1165. }
  1166. }
  1167. return masterTopics, slaveTopics
  1168. }