produce_window.go 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "cicv-data-closedloop/pjisuv_param"
  11. "github.com/bluenviron/goroslib/v2"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  16. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  17. "math"
  18. "sync"
  19. "time"
  20. )
  21. // 所有共享变量
  22. var (
  23. pjisuvParam = pjisuv_param.PjisuvParam{
  24. ObjDicOfTpperception: make(map[uint32][]float32),
  25. ObjTypeDicOfTpperception: make(map[uint32]uint8),
  26. ObjSpeedDicOfTpperception: make(map[uint32]float64),
  27. } // /cicv_location
  28. mutexOfCicvLocation sync.RWMutex
  29. // /tpperception
  30. mutexOfTpperception sync.RWMutex
  31. // /pj_control_pub
  32. mutexOfPjControlPub sync.RWMutex
  33. // /data_read
  34. mutexOfDataRead sync.RWMutex
  35. // /pj_vehicle_fdb_pub
  36. mutexOfPjVehicleFdbPub sync.RWMutex
  37. // /pj_vehicle_fdb_pub
  38. mutexOfCicvAmrTrajectory sync.RWMutex
  39. )
  40. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  41. func PrepareTimeWindowProducerQueue() {
  42. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  43. var err error
  44. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  45. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  46. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  47. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  48. for i, topic := range commonConfig.SubscribeTopics {
  49. // 增加了可扩展性
  50. if topic == masterConfig.TopicOfCicvExtend {
  51. go func() {
  52. for {
  53. time.Sleep(time.Duration(3500) * time.Millisecond)
  54. for _, f := range masterConfig.RuleOfCicvExtend {
  55. label := f(pjisuvParam)
  56. if label != "" {
  57. saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
  58. break
  59. }
  60. }
  61. }
  62. }()
  63. }
  64. // 其他常规监听器
  65. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  66. // 1
  67. if topic == masterConfig.TopicOfAmrPose && len(masterConfig.RuleOfAmrPose) > 0 {
  68. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  69. Node: commonConfig.RosNode,
  70. Topic: topic,
  71. Callback: func(data *visualization_msgs.MarkerArray) {
  72. subscribersTimeMutexes[i].Lock()
  73. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  74. subscribersMutexes[i].Lock()
  75. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  76. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  77. var faultLabel string
  78. for _, f := range masterConfig.RuleOfAmrPose {
  79. faultLabel = f(data)
  80. if faultLabel != "" {
  81. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  82. subscribersTimes[i] = time.Now()
  83. break
  84. }
  85. }
  86. subscribersMutexes[i].Unlock()
  87. }
  88. subscribersTimeMutexes[i].Unlock()
  89. },
  90. })
  91. }
  92. // 2
  93. if topic == masterConfig.TopicOfBoundingBoxesFast && len(masterConfig.RuleOfBoundingBoxesFast) > 0 {
  94. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  95. Node: commonConfig.RosNode,
  96. Topic: topic,
  97. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  98. subscribersTimeMutexes[i].Lock()
  99. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  100. subscribersMutexes[i].Lock()
  101. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  102. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  103. var faultLabel string
  104. for _, f := range masterConfig.RuleOfBoundingBoxesFast {
  105. faultLabel = f(data)
  106. if faultLabel != "" {
  107. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  108. subscribersTimes[i] = time.Now()
  109. break
  110. }
  111. }
  112. subscribersMutexes[i].Unlock()
  113. }
  114. subscribersTimeMutexes[i].Unlock()
  115. },
  116. })
  117. }
  118. // 3
  119. if topic == masterConfig.TopicOfCameraFault && len(masterConfig.RuleOfCameraFault) > 0 {
  120. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  121. Node: commonConfig.RosNode,
  122. Topic: topic,
  123. Callback: func(data *pjisuv_msgs.FaultVec) {
  124. subscribersTimeMutexes[i].Lock()
  125. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  126. subscribersMutexes[i].Lock()
  127. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  128. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  129. var faultLabel string
  130. for _, f := range masterConfig.RuleOfCameraFault {
  131. faultLabel = f(data)
  132. if faultLabel != "" {
  133. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  134. subscribersTimes[i] = time.Now()
  135. break
  136. }
  137. }
  138. subscribersMutexes[i].Unlock()
  139. }
  140. subscribersTimeMutexes[i].Unlock()
  141. },
  142. })
  143. }
  144. // 4
  145. if topic == masterConfig.TopicOfCanData && len(masterConfig.RuleOfCanData) > 0 {
  146. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  147. Node: commonConfig.RosNode,
  148. Topic: topic,
  149. Callback: func(data *pjisuv_msgs.Frame) {
  150. subscribersTimeMutexes[i].Lock()
  151. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  152. subscribersMutexes[i].Lock()
  153. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  154. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  155. var faultLabel string
  156. for _, f := range masterConfig.RuleOfCanData {
  157. faultLabel = f(data)
  158. if faultLabel != "" {
  159. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  160. subscribersTimes[i] = time.Now()
  161. break
  162. }
  163. }
  164. subscribersMutexes[i].Unlock()
  165. }
  166. subscribersTimeMutexes[i].Unlock()
  167. },
  168. })
  169. }
  170. // 5
  171. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud && len(masterConfig.RuleOfCh128x1LslidarPointCloud) > 0 {
  172. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  173. Node: commonConfig.RosNode,
  174. Topic: topic,
  175. Callback: func(data *sensor_msgs.PointCloud2) {
  176. subscribersTimeMutexes[i].Lock()
  177. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  178. subscribersMutexes[i].Lock()
  179. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  180. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  181. var faultLabel string
  182. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud {
  183. faultLabel = f(data)
  184. if faultLabel != "" {
  185. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  186. subscribersTimes[i] = time.Now()
  187. break
  188. }
  189. }
  190. subscribersMutexes[i].Unlock()
  191. }
  192. subscribersTimeMutexes[i].Unlock()
  193. },
  194. })
  195. }
  196. // 6
  197. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud && len(masterConfig.RuleOfCh64wLLslidarPointCloud) > 0 {
  198. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  199. Node: commonConfig.RosNode,
  200. Topic: topic,
  201. Callback: func(data *sensor_msgs.PointCloud2) {
  202. subscribersTimeMutexes[i].Lock()
  203. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  204. subscribersMutexes[i].Lock()
  205. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  206. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  207. var faultLabel string
  208. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud {
  209. faultLabel = f(data)
  210. if faultLabel != "" {
  211. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  212. subscribersTimes[i] = time.Now()
  213. break
  214. }
  215. }
  216. subscribersMutexes[i].Unlock()
  217. }
  218. subscribersTimeMutexes[i].Unlock()
  219. },
  220. })
  221. }
  222. // 7
  223. if topic == masterConfig.TopicOfCh64wLScan && len(masterConfig.RuleOfCh64wLScan) > 0 {
  224. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  225. Node: commonConfig.RosNode,
  226. Topic: topic,
  227. Callback: func(data *sensor_msgs.LaserScan) {
  228. subscribersTimeMutexes[i].Lock()
  229. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  230. subscribersMutexes[i].Lock()
  231. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  232. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  233. var faultLabel string
  234. for _, f := range masterConfig.RuleOfCh64wLScan {
  235. faultLabel = f(data)
  236. if faultLabel != "" {
  237. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  238. subscribersTimes[i] = time.Now()
  239. break
  240. }
  241. }
  242. subscribersMutexes[i].Unlock()
  243. }
  244. subscribersTimeMutexes[i].Unlock()
  245. },
  246. })
  247. }
  248. // 8
  249. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud && len(masterConfig.RuleOfCh64wRLslidarPointCloud) > 0 {
  250. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  251. Node: commonConfig.RosNode,
  252. Topic: topic,
  253. Callback: func(data *sensor_msgs.PointCloud2) {
  254. subscribersTimeMutexes[i].Lock()
  255. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  256. subscribersMutexes[i].Lock()
  257. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  258. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  259. var faultLabel string
  260. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud {
  261. faultLabel = f(data)
  262. if faultLabel != "" {
  263. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  264. subscribersTimes[i] = time.Now()
  265. break
  266. }
  267. }
  268. subscribersMutexes[i].Unlock()
  269. }
  270. subscribersTimeMutexes[i].Unlock()
  271. },
  272. })
  273. }
  274. // 9
  275. if topic == masterConfig.TopicOfCh64wRScan && len(masterConfig.RuleOfCh64wRScan) > 0 {
  276. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  277. Node: commonConfig.RosNode,
  278. Topic: topic,
  279. Callback: func(data *sensor_msgs.LaserScan) {
  280. subscribersTimeMutexes[i].Lock()
  281. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  282. subscribersMutexes[i].Lock()
  283. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  284. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  285. var faultLabel string
  286. for _, f := range masterConfig.RuleOfCh64wRScan {
  287. faultLabel = f(data)
  288. if faultLabel != "" {
  289. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  290. subscribersTimes[i] = time.Now()
  291. break
  292. }
  293. }
  294. subscribersMutexes[i].Unlock()
  295. }
  296. subscribersTimeMutexes[i].Unlock()
  297. },
  298. })
  299. }
  300. // 10
  301. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects && len(masterConfig.RuleOfCicvLidarclusterMovingObjects) > 0 {
  302. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  303. Node: commonConfig.RosNode,
  304. Topic: topic,
  305. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  306. subscribersTimeMutexes[i].Lock()
  307. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  308. subscribersMutexes[i].Lock()
  309. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  310. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  311. var faultLabel string
  312. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects {
  313. faultLabel = f(data)
  314. if faultLabel != "" {
  315. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  316. subscribersTimes[i] = time.Now()
  317. break
  318. }
  319. }
  320. subscribersMutexes[i].Unlock()
  321. }
  322. subscribersTimeMutexes[i].Unlock()
  323. },
  324. })
  325. }
  326. // 11
  327. if topic == masterConfig.TopicOfCicvAmrTrajectory && len(masterConfig.RuleOfCicvAmrTrajectory) > 0 {
  328. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  329. Node: commonConfig.RosNode,
  330. Topic: topic,
  331. Callback: func(data *pjisuv_msgs.Trajectory) {
  332. subscribersTimeMutexes[i].Lock()
  333. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  334. subscribersMutexes[i].Lock()
  335. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  336. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  337. var faultLabel string
  338. for _, f := range masterConfig.RuleOfCicvAmrTrajectory {
  339. faultLabel = f(data, pjisuvParam)
  340. if faultLabel != "" {
  341. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  342. subscribersTimes[i] = time.Now()
  343. break
  344. }
  345. }
  346. subscribersMutexes[i].Unlock()
  347. }
  348. subscribersTimeMutexes[i].Unlock()
  349. // 触发后更新共享变量
  350. mutexOfCicvAmrTrajectory.RLock()
  351. {
  352. var currentCurvateres []float64
  353. for _, point := range data.Trajectoryinfo.Trajectorypoints {
  354. currentCurvateres = append(currentCurvateres, math.Abs(float64(point.Curvature)))
  355. }
  356. pjisuvParam.LastCurvaturesOfCicvAmrTrajectory = currentCurvateres
  357. }
  358. mutexOfCicvAmrTrajectory.RUnlock()
  359. },
  360. })
  361. }
  362. // 12
  363. if topic == masterConfig.TopicOfCicvLocation && (len(masterConfig.RuleOfCicvLocation1) > 0 || len(masterConfig.RuleOfCicvLocation2) > 0) {
  364. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  365. Node: commonConfig.RosNode,
  366. Topic: topic,
  367. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  368. // 更新共享变量
  369. mutexOfCicvLocation.RLock()
  370. {
  371. pjisuvParam.VelocityXOfCicvLocation = data.VelocityX
  372. pjisuvParam.VelocityYOfCicvLocation = data.VelocityY
  373. pjisuvParam.VelocityZOfCicvLocation = data.VelocityZ
  374. pjisuvParam.YawOfCicvLocation = data.Yaw
  375. pjisuvParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
  376. pjisuvParam.PositionXOfCicvLocation = data.PositionX
  377. pjisuvParam.PositionYOfCicvLocation = data.PositionY
  378. }
  379. mutexOfCicvLocation.RUnlock()
  380. subscribersTimeMutexes[i].Lock()
  381. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  382. subscribersMutexes[i].Lock()
  383. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  384. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  385. faultLabel := ""
  386. if len(masterConfig.RuleOfCicvLocation1) > 0 {
  387. for _, f := range masterConfig.RuleOfCicvLocation1 {
  388. faultLabel = f(data)
  389. if faultLabel != "" {
  390. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  391. break
  392. }
  393. }
  394. }
  395. if faultLabel == "" && len(masterConfig.RuleOfCicvLocation2) > 0 {
  396. for _, f := range masterConfig.RuleOfCicvLocation2 {
  397. faultLabel = f(data, pjisuvParam)
  398. if faultLabel != "" {
  399. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  400. break
  401. }
  402. }
  403. }
  404. subscribersMutexes[i].Unlock()
  405. }
  406. subscribersTimeMutexes[i].Unlock()
  407. },
  408. })
  409. }
  410. // 13
  411. if topic == masterConfig.TopicOfCloudClusters && len(masterConfig.RuleOfCloudClusters) > 0 {
  412. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  413. Node: commonConfig.RosNode,
  414. Topic: topic,
  415. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  416. subscribersTimeMutexes[i].Lock()
  417. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  418. subscribersMutexes[i].Lock()
  419. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  420. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  421. var faultLabel string
  422. for _, f := range masterConfig.RuleOfCloudClusters {
  423. faultLabel = f(data)
  424. if faultLabel != "" {
  425. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  426. subscribersTimes[i] = time.Now()
  427. break
  428. }
  429. }
  430. subscribersMutexes[i].Unlock()
  431. }
  432. subscribersTimeMutexes[i].Unlock()
  433. },
  434. })
  435. }
  436. // 14
  437. if topic == masterConfig.TopicOfHeartbeatInfo && len(masterConfig.RuleOfHeartbeatInfo) > 0 {
  438. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  439. Node: commonConfig.RosNode,
  440. Topic: topic,
  441. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  442. subscribersTimeMutexes[i].Lock()
  443. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  444. subscribersMutexes[i].Lock()
  445. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  446. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  447. var faultLabel string
  448. for _, f := range masterConfig.RuleOfHeartbeatInfo {
  449. faultLabel = f(data)
  450. if faultLabel != "" {
  451. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  452. subscribersTimes[i] = time.Now()
  453. break
  454. }
  455. }
  456. subscribersMutexes[i].Unlock()
  457. }
  458. subscribersTimeMutexes[i].Unlock()
  459. },
  460. })
  461. }
  462. // 15
  463. if topic == masterConfig.TopicOfLidarPretreatmentCost && len(masterConfig.RuleOfLidarPretreatmentCost) > 0 {
  464. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  465. Node: commonConfig.RosNode,
  466. Topic: topic,
  467. Callback: func(data *geometry_msgs.Vector3Stamped) {
  468. subscribersTimeMutexes[i].Lock()
  469. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  470. subscribersMutexes[i].Lock()
  471. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  472. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  473. var faultLabel string
  474. for _, f := range masterConfig.RuleOfLidarPretreatmentCost {
  475. faultLabel = f(data)
  476. if faultLabel != "" {
  477. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  478. subscribersTimes[i] = time.Now()
  479. break
  480. }
  481. }
  482. subscribersMutexes[i].Unlock()
  483. }
  484. subscribersTimeMutexes[i].Unlock()
  485. },
  486. })
  487. }
  488. // 16
  489. if topic == masterConfig.TopicOfLidarPretreatmentOdometry && len(masterConfig.RuleOfLidarPretreatmentOdometry) > 0 {
  490. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  491. Node: commonConfig.RosNode,
  492. Topic: topic,
  493. Callback: func(data *nav_msgs.Odometry) {
  494. subscribersTimeMutexes[i].Lock()
  495. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  496. subscribersMutexes[i].Lock()
  497. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  498. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  499. var faultLabel string
  500. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry {
  501. faultLabel = f(data)
  502. if faultLabel != "" {
  503. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  504. subscribersTimes[i] = time.Now()
  505. break
  506. }
  507. }
  508. subscribersMutexes[i].Unlock()
  509. }
  510. subscribersTimeMutexes[i].Unlock()
  511. },
  512. })
  513. }
  514. // 17
  515. if topic == masterConfig.TopicOfLidarRoi && len(masterConfig.RuleOfLidarRoi) > 0 {
  516. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  517. Node: commonConfig.RosNode,
  518. Topic: topic,
  519. Callback: func(data *geometry_msgs.PolygonStamped) {
  520. subscribersTimeMutexes[i].Lock()
  521. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  522. subscribersMutexes[i].Lock()
  523. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  524. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  525. var faultLabel string
  526. for _, f := range masterConfig.RuleOfLidarRoi {
  527. faultLabel = f(data)
  528. if faultLabel != "" {
  529. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  530. subscribersTimes[i] = time.Now()
  531. break
  532. }
  533. }
  534. subscribersMutexes[i].Unlock()
  535. }
  536. subscribersTimeMutexes[i].Unlock()
  537. },
  538. })
  539. }
  540. // 18
  541. if topic == masterConfig.TopicOfLine1 && len(masterConfig.RuleOfLine1) > 0 {
  542. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  543. Node: commonConfig.RosNode,
  544. Topic: topic,
  545. Callback: func(data *nav_msgs.Path) {
  546. subscribersTimeMutexes[i].Lock()
  547. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  548. subscribersMutexes[i].Lock()
  549. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  550. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  551. var faultLabel string
  552. for _, f := range masterConfig.RuleOfLine1 {
  553. faultLabel = f(data)
  554. if faultLabel != "" {
  555. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  556. subscribersTimes[i] = time.Now()
  557. break
  558. }
  559. }
  560. subscribersMutexes[i].Unlock()
  561. }
  562. subscribersTimeMutexes[i].Unlock()
  563. },
  564. })
  565. }
  566. // 19
  567. if topic == masterConfig.TopicOfLine2 && len(masterConfig.RuleOfLine2) > 0 {
  568. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  569. Node: commonConfig.RosNode,
  570. Topic: topic,
  571. Callback: func(data *nav_msgs.Path) {
  572. subscribersTimeMutexes[i].Lock()
  573. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  574. subscribersMutexes[i].Lock()
  575. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  576. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  577. var faultLabel string
  578. for _, f := range masterConfig.RuleOfLine2 {
  579. faultLabel = f(data)
  580. if faultLabel != "" {
  581. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  582. subscribersTimes[i] = time.Now()
  583. break
  584. }
  585. }
  586. subscribersMutexes[i].Unlock()
  587. }
  588. subscribersTimeMutexes[i].Unlock()
  589. },
  590. })
  591. }
  592. // 20
  593. if topic == masterConfig.TopicOfMapPolygon && len(masterConfig.RuleOfMapPolygon) > 0 {
  594. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  595. Node: commonConfig.RosNode,
  596. Topic: topic,
  597. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  598. subscribersTimeMutexes[i].Lock()
  599. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  600. subscribersMutexes[i].Lock()
  601. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  602. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  603. var faultLabel string
  604. for _, f := range masterConfig.RuleOfMapPolygon {
  605. faultLabel = f(data, pjisuvParam)
  606. if faultLabel != "" {
  607. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  608. subscribersTimes[i] = time.Now()
  609. break
  610. }
  611. }
  612. subscribersMutexes[i].Unlock()
  613. }
  614. subscribersTimeMutexes[i].Unlock()
  615. },
  616. })
  617. }
  618. // 21
  619. if topic == masterConfig.TopicOfObstacleDisplay && len(masterConfig.RuleOfObstacleDisplay) > 0 {
  620. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  621. Node: commonConfig.RosNode,
  622. Topic: topic,
  623. Callback: func(data *visualization_msgs.MarkerArray) {
  624. subscribersTimeMutexes[i].Lock()
  625. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  626. subscribersMutexes[i].Lock()
  627. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  628. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  629. var faultLabel string
  630. for _, f := range masterConfig.RuleOfObstacleDisplay {
  631. faultLabel = f(data)
  632. if faultLabel != "" {
  633. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  634. subscribersTimes[i] = time.Now()
  635. break
  636. }
  637. }
  638. subscribersMutexes[i].Unlock()
  639. }
  640. subscribersTimeMutexes[i].Unlock()
  641. },
  642. })
  643. }
  644. // 22
  645. if topic == masterConfig.TopicOfPjControlPub && len(masterConfig.RuleOfPjControlPub) > 0 {
  646. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  647. Node: commonConfig.RosNode,
  648. Topic: topic,
  649. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  650. // 更新共享变量
  651. mutexOfPjControlPub.RLock()
  652. {
  653. pjisuvParam.NumCountPjiControlCommandOfPjControlPub++
  654. if pjisuvParam.NumCountPjiControlCommandOfPjControlPub == 10 {
  655. pjisuvParam.EgoSteeringCmdOfPjControlPub = append(pjisuvParam.EgoSteeringCmdOfPjControlPub, data.ICPVCmdStrAngle)
  656. pjisuvParam.EgoThrottleCmdOfPjControlPub = append(pjisuvParam.EgoThrottleCmdOfPjControlPub, data.ICPVCmdAccPelPosAct)
  657. pjisuvParam.NumCountPjiControlCommandOfPjControlPub = 0
  658. }
  659. }
  660. mutexOfPjControlPub.RUnlock()
  661. subscribersTimeMutexes[i].Lock()
  662. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  663. subscribersMutexes[i].Lock()
  664. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  665. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  666. var faultLabel string
  667. for _, f := range masterConfig.RuleOfPjControlPub {
  668. faultLabel = f(data)
  669. if faultLabel != "" {
  670. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  671. subscribersTimes[i] = time.Now()
  672. break
  673. }
  674. }
  675. subscribersMutexes[i].Unlock()
  676. }
  677. subscribersTimeMutexes[i].Unlock()
  678. },
  679. })
  680. }
  681. // 23
  682. if topic == masterConfig.TopicOfPointsCluster && len(masterConfig.RuleOfPointsCluster) > 0 {
  683. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  684. Node: commonConfig.RosNode,
  685. Topic: topic,
  686. Callback: func(data *sensor_msgs.PointCloud2) {
  687. subscribersTimeMutexes[i].Lock()
  688. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  689. subscribersMutexes[i].Lock()
  690. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  691. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  692. var faultLabel string
  693. for _, f := range masterConfig.RuleOfPointsCluster {
  694. faultLabel = f(data)
  695. if faultLabel != "" {
  696. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  697. subscribersTimes[i] = time.Now()
  698. break
  699. }
  700. }
  701. subscribersMutexes[i].Unlock()
  702. }
  703. subscribersTimeMutexes[i].Unlock()
  704. },
  705. })
  706. }
  707. // 24
  708. if topic == masterConfig.TopicOfPointsConcat && len(masterConfig.RuleOfPointsConcat) > 0 {
  709. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  710. Node: commonConfig.RosNode,
  711. Topic: topic,
  712. Callback: func(data *sensor_msgs.PointCloud2) {
  713. subscribersTimeMutexes[i].Lock()
  714. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  715. subscribersMutexes[i].Lock()
  716. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  717. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  718. var faultLabel string
  719. for _, f := range masterConfig.RuleOfPointsConcat {
  720. faultLabel = f(data)
  721. if faultLabel != "" {
  722. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  723. subscribersTimes[i] = time.Now()
  724. break
  725. }
  726. }
  727. subscribersMutexes[i].Unlock()
  728. }
  729. subscribersTimeMutexes[i].Unlock()
  730. },
  731. })
  732. }
  733. // 25
  734. if topic == masterConfig.TopicOfReferenceDisplay && len(masterConfig.RuleOfReferenceDisplay) > 0 {
  735. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  736. Node: commonConfig.RosNode,
  737. Topic: topic,
  738. Callback: func(data *nav_msgs.Path) {
  739. subscribersTimeMutexes[i].Lock()
  740. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  741. subscribersMutexes[i].Lock()
  742. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  743. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  744. var faultLabel string
  745. for _, f := range masterConfig.RuleOfReferenceDisplay {
  746. faultLabel = f(data)
  747. if faultLabel != "" {
  748. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  749. subscribersTimes[i] = time.Now()
  750. break
  751. }
  752. }
  753. subscribersMutexes[i].Unlock()
  754. }
  755. subscribersTimeMutexes[i].Unlock()
  756. },
  757. })
  758. }
  759. // 26
  760. if topic == masterConfig.TopicOfReferenceTrajectory && len(masterConfig.RuleOfReferenceTrajectory) > 0 {
  761. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  762. Node: commonConfig.RosNode,
  763. Topic: topic,
  764. Callback: func(data *pjisuv_msgs.Trajectory) {
  765. subscribersTimeMutexes[i].Lock()
  766. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  767. subscribersMutexes[i].Lock()
  768. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  769. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  770. var faultLabel string
  771. for _, f := range masterConfig.RuleOfReferenceTrajectory {
  772. faultLabel = f(data)
  773. if faultLabel != "" {
  774. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  775. subscribersTimes[i] = time.Now()
  776. break
  777. }
  778. }
  779. subscribersMutexes[i].Unlock()
  780. }
  781. subscribersTimeMutexes[i].Unlock()
  782. },
  783. })
  784. }
  785. // 27
  786. if topic == masterConfig.TopicOfRoiPoints && len(masterConfig.RuleOfRoiPoints) > 0 {
  787. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  788. Node: commonConfig.RosNode,
  789. Topic: topic,
  790. Callback: func(data *sensor_msgs.PointCloud2) {
  791. subscribersTimeMutexes[i].Lock()
  792. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  793. subscribersMutexes[i].Lock()
  794. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  795. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  796. var faultLabel string
  797. for _, f := range masterConfig.RuleOfRoiPoints {
  798. faultLabel = f(data)
  799. if faultLabel != "" {
  800. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  801. subscribersTimes[i] = time.Now()
  802. break
  803. }
  804. }
  805. subscribersMutexes[i].Unlock()
  806. }
  807. subscribersTimeMutexes[i].Unlock()
  808. },
  809. })
  810. }
  811. // 28
  812. if topic == masterConfig.TopicOfRoiPolygon && len(masterConfig.RuleOfRoiPolygon) > 0 {
  813. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  814. Node: commonConfig.RosNode,
  815. Topic: topic,
  816. Callback: func(data *nav_msgs.Path) {
  817. subscribersTimeMutexes[i].Lock()
  818. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  819. subscribersMutexes[i].Lock()
  820. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  821. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  822. var faultLabel string
  823. for _, f := range masterConfig.RuleOfRoiPolygon {
  824. faultLabel = f(data)
  825. if faultLabel != "" {
  826. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  827. subscribersTimes[i] = time.Now()
  828. break
  829. }
  830. }
  831. subscribersMutexes[i].Unlock()
  832. }
  833. subscribersTimeMutexes[i].Unlock()
  834. },
  835. })
  836. }
  837. // 29
  838. if topic == masterConfig.TopicOfTf && len(masterConfig.RuleOfTf) > 0 {
  839. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  840. Node: commonConfig.RosNode,
  841. Topic: topic,
  842. Callback: func(data *tf2_msgs.TFMessage) {
  843. subscribersTimeMutexes[i].Lock()
  844. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  845. subscribersMutexes[i].Lock()
  846. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  847. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  848. var faultLabel string
  849. for _, f := range masterConfig.RuleOfTf {
  850. faultLabel = f(data)
  851. if faultLabel != "" {
  852. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  853. subscribersTimes[i] = time.Now()
  854. break
  855. }
  856. }
  857. subscribersMutexes[i].Unlock()
  858. }
  859. subscribersTimeMutexes[i].Unlock()
  860. },
  861. })
  862. }
  863. // 30
  864. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  865. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  866. Node: commonConfig.RosNode,
  867. Topic: topic,
  868. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  869. // 更新共享变量
  870. mutexOfTpperception.RLock()
  871. {
  872. for _, obj := range data.Objs {
  873. if obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  874. continue
  875. }
  876. // 检查 ObjDicOfTpperception 是否为 nil,如果是,则初始化它
  877. if pjisuvParam.ObjDicOfTpperception == nil {
  878. pjisuvParam.ObjDicOfTpperception = make(map[uint32][]float32)
  879. }
  880. if _, ok := pjisuvParam.ObjDicOfTpperception[obj.Id]; !ok {
  881. pjisuvParam.ObjDicOfTpperception[obj.Id] = []float32{}
  882. }
  883. pjisuvParam.ObjDicOfTpperception[obj.Id] = append(pjisuvParam.ObjDicOfTpperception[obj.Id], obj.Y)
  884. }
  885. }
  886. mutexOfTpperception.RUnlock()
  887. subscribersTimeMutexes[i].Lock()
  888. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  889. subscribersMutexes[i].Lock()
  890. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  891. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  892. var faultLabel string
  893. for _, f := range masterConfig.RuleOfTpperception {
  894. faultLabel = f(data, pjisuvParam)
  895. if faultLabel != "" {
  896. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  897. break
  898. }
  899. }
  900. subscribersMutexes[i].Unlock()
  901. }
  902. subscribersTimeMutexes[i].Unlock()
  903. // -------- 触发后更新共享变量
  904. mutexOfTpperception.RLock()
  905. {
  906. for _, obj := range data.Objs {
  907. pjisuvParam.ObjTypeDicOfTpperception[obj.Id] = obj.Type
  908. pjisuvParam.ObjSpeedDicOfTpperception[obj.Id] = math.Pow(math.Pow(float64(obj.Vxabs), 2)+math.Pow(float64(obj.Vyabs), 2), 0.5)
  909. }
  910. }
  911. mutexOfTpperception.RUnlock()
  912. },
  913. })
  914. }
  915. // 31
  916. if topic == masterConfig.TopicOfTpperceptionVis && len(masterConfig.RuleOfTpperceptionVis) > 0 {
  917. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  918. Node: commonConfig.RosNode,
  919. Topic: topic,
  920. Callback: func(data *visualization_msgs.MarkerArray) {
  921. subscribersTimeMutexes[i].Lock()
  922. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  923. subscribersMutexes[i].Lock()
  924. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  925. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  926. var faultLabel string
  927. for _, f := range masterConfig.RuleOfTpperceptionVis {
  928. faultLabel = f(data)
  929. if faultLabel != "" {
  930. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  931. subscribersTimes[i] = time.Now()
  932. break
  933. }
  934. }
  935. subscribersMutexes[i].Unlock()
  936. }
  937. subscribersTimeMutexes[i].Unlock()
  938. },
  939. })
  940. }
  941. // 32
  942. if topic == masterConfig.TopicOfTprouteplan && len(masterConfig.RuleOfTprouteplan) > 0 {
  943. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  944. Node: commonConfig.RosNode,
  945. Topic: topic,
  946. Callback: func(data *pjisuv_msgs.RoutePlan) {
  947. subscribersTimeMutexes[i].Lock()
  948. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  949. subscribersMutexes[i].Lock()
  950. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  951. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  952. var faultLabel string
  953. for _, f := range masterConfig.RuleOfTprouteplan {
  954. faultLabel = f(data)
  955. if faultLabel != "" {
  956. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  957. subscribersTimes[i] = time.Now()
  958. break
  959. }
  960. }
  961. subscribersMutexes[i].Unlock()
  962. }
  963. subscribersTimeMutexes[i].Unlock()
  964. },
  965. })
  966. }
  967. // 33
  968. if topic == masterConfig.TopicOfTrajectoryDisplay && len(masterConfig.RuleOfTrajectoryDisplay) > 0 {
  969. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  970. Node: commonConfig.RosNode,
  971. Topic: topic,
  972. Callback: func(data *nav_msgs.Path) {
  973. subscribersTimeMutexes[i].Lock()
  974. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  975. subscribersMutexes[i].Lock()
  976. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  977. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  978. var faultLabel string
  979. for _, f := range masterConfig.RuleOfTrajectoryDisplay {
  980. faultLabel = f(data)
  981. if faultLabel != "" {
  982. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  983. subscribersTimes[i] = time.Now()
  984. break
  985. }
  986. }
  987. subscribersMutexes[i].Unlock()
  988. }
  989. subscribersTimeMutexes[i].Unlock()
  990. },
  991. })
  992. }
  993. // 34
  994. if topic == masterConfig.TopicOfUngroundCloudpoints && len(masterConfig.RuleOfUngroundCloudpoints) > 0 {
  995. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  996. Node: commonConfig.RosNode,
  997. Topic: topic,
  998. Callback: func(data *sensor_msgs.PointCloud2) {
  999. subscribersTimeMutexes[i].Lock()
  1000. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1001. subscribersMutexes[i].Lock()
  1002. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1003. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1004. var faultLabel string
  1005. for _, f := range masterConfig.RuleOfUngroundCloudpoints {
  1006. faultLabel = f(data)
  1007. if faultLabel != "" {
  1008. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1009. subscribersTimes[i] = time.Now()
  1010. break
  1011. }
  1012. }
  1013. subscribersMutexes[i].Unlock()
  1014. }
  1015. subscribersTimeMutexes[i].Unlock()
  1016. },
  1017. })
  1018. }
  1019. // 35
  1020. if topic == masterConfig.TopicOfCameraImage && len(masterConfig.RuleOfCameraImage) > 0 {
  1021. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1022. Node: commonConfig.RosNode,
  1023. Topic: topic,
  1024. Callback: func(data *sensor_msgs.Image) {
  1025. subscribersTimeMutexes[i].Lock()
  1026. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1027. subscribersMutexes[i].Lock()
  1028. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1029. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1030. var faultLabel string
  1031. for _, f := range masterConfig.RuleOfCameraImage {
  1032. faultLabel = f(data)
  1033. if faultLabel != "" {
  1034. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1035. subscribersTimes[i] = time.Now()
  1036. break
  1037. }
  1038. }
  1039. subscribersMutexes[i].Unlock()
  1040. }
  1041. subscribersTimeMutexes[i].Unlock()
  1042. },
  1043. })
  1044. }
  1045. // 36
  1046. if topic == masterConfig.TopicOfDataRead && len(masterConfig.RuleOfDataRead) > 0 {
  1047. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1048. Node: commonConfig.RosNode,
  1049. Topic: topic,
  1050. Callback: func(data *pjisuv_msgs.Retrieval) {
  1051. // 更新共享变量
  1052. mutexOfDataRead.RLock()
  1053. {
  1054. pjisuvParam.NumCountDataReadOfDataRead++
  1055. if pjisuvParam.NumCountDataReadOfDataRead == 10 {
  1056. pjisuvParam.EgoSteeringRealOfDataRead = append(pjisuvParam.EgoSteeringRealOfDataRead, data.ActStrWhAng)
  1057. pjisuvParam.EgoThrottleRealOfDataRead = append(pjisuvParam.EgoThrottleRealOfDataRead, data.AccPed2)
  1058. pjisuvParam.NumCountDataReadOfDataRead = 0
  1059. }
  1060. pjisuvParam.StrgAngleRealValueOfDataRead = data.ActStrWhAng
  1061. }
  1062. mutexOfDataRead.RUnlock()
  1063. subscribersTimeMutexes[i].Lock()
  1064. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1065. subscribersMutexes[i].Lock()
  1066. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1067. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1068. var faultLabel string
  1069. for _, f := range masterConfig.RuleOfDataRead {
  1070. faultLabel = f(data)
  1071. if faultLabel != "" {
  1072. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1073. subscribersTimes[i] = time.Now()
  1074. break
  1075. }
  1076. }
  1077. subscribersMutexes[i].Unlock()
  1078. }
  1079. subscribersTimeMutexes[i].Unlock()
  1080. },
  1081. })
  1082. }
  1083. // 37
  1084. if topic == masterConfig.TopicOfPjiGps && len(masterConfig.RuleOfPjiGps) > 0 {
  1085. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1086. Node: commonConfig.RosNode,
  1087. Topic: topic,
  1088. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  1089. subscribersTimeMutexes[i].Lock()
  1090. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1091. subscribersMutexes[i].Lock()
  1092. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1093. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1094. var faultLabel string
  1095. for _, f := range masterConfig.RuleOfPjiGps {
  1096. faultLabel = f(data)
  1097. if faultLabel != "" {
  1098. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1099. subscribersTimes[i] = time.Now()
  1100. break
  1101. }
  1102. }
  1103. subscribersMutexes[i].Unlock()
  1104. }
  1105. subscribersTimeMutexes[i].Unlock()
  1106. },
  1107. })
  1108. }
  1109. // 39
  1110. if topic == masterConfig.TopicOfPjVehicleFdbPub && len(masterConfig.RuleOfPjVehicleFdbPub) > 0 {
  1111. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1112. Node: commonConfig.RosNode,
  1113. Topic: topic,
  1114. Callback: func(data *pjisuv_msgs.VehicleFdb) {
  1115. subscribersTimeMutexes[i].Lock()
  1116. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1117. subscribersMutexes[i].Lock()
  1118. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1119. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1120. var faultLabel string
  1121. for _, f := range masterConfig.RuleOfPjVehicleFdbPub {
  1122. faultLabel = f(data, &pjisuvParam)
  1123. if faultLabel != "" {
  1124. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1125. subscribersTimes[i] = time.Now()
  1126. break
  1127. }
  1128. }
  1129. subscribersMutexes[i].Unlock()
  1130. }
  1131. subscribersTimeMutexes[i].Unlock()
  1132. },
  1133. })
  1134. }
  1135. if err != nil {
  1136. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  1137. //TODO 如何回传日志
  1138. continue
  1139. }
  1140. }
  1141. select {
  1142. case signal := <-service.ChannelKillWindowProducer:
  1143. if signal == 1 {
  1144. commonConfig.RosNode.Close()
  1145. service.AddKillTimes("3")
  1146. return
  1147. }
  1148. }
  1149. }
  1150. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  1151. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1152. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) { // 如果是不在旧故障窗口内,添加一个新窗口
  1153. newTimeWindow := commonEntity.TimeWindow{
  1154. FaultTime: faultHappenTime,
  1155. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  1156. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  1157. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  1158. Labels: []string{faultLabel},
  1159. MasterTopics: masterTopics,
  1160. SlaveTopics: slaveTopics,
  1161. }
  1162. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1163. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1164. } else { // 如果在旧故障窗口内
  1165. commonEntity.TimeWindowProducerQueueMutex.RLock()
  1166. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  1167. // 更新故障窗口end时间
  1168. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  1169. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1170. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  1171. lastTimeWindow.TimeWindowEnd = maxEnd
  1172. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  1173. } else {
  1174. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  1175. lastTimeWindow.TimeWindowEnd = expectEnd
  1176. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1177. }
  1178. }
  1179. // 更新label
  1180. labels := lastTimeWindow.Labels
  1181. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1182. // 更新 topic
  1183. sourceMasterTopics := lastTimeWindow.MasterTopics
  1184. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1185. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1186. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1187. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1188. }
  1189. }
  1190. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1191. // 获取所有需要采集的topic
  1192. var faultCodeTopics []string
  1193. for _, code := range commonConfig.CloudConfig.Triggers {
  1194. if code.Label == faultLabel {
  1195. faultCodeTopics = code.Topics
  1196. }
  1197. }
  1198. // 根据不同节点采集的topic进行分配采集
  1199. for _, acceptTopic := range faultCodeTopics {
  1200. for _, host := range commonConfig.CloudConfig.Hosts {
  1201. for _, topic := range host.Topics {
  1202. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1203. masterTopics = append(masterTopics, acceptTopic)
  1204. }
  1205. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1206. slaveTopics = append(slaveTopics, acceptTopic)
  1207. }
  1208. }
  1209. }
  1210. }
  1211. return masterTopics, slaveTopics
  1212. }