produce_window.go 69 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "cicv-data-closedloop/pjisuv_ticker"
  11. "github.com/bluenviron/goroslib/v2"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  16. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  17. "math"
  18. "sync"
  19. "time"
  20. )
  21. var (
  22. cicvLocationTime = time.Now()
  23. // -----------------------------共享变量
  24. // /tpperception
  25. objDicOfTpperception = make(map[uint32][]float32)
  26. objTypeDicOfTpperception = make(map[uint32]uint8)
  27. objSpeedDicOfTpperception = make(map[uint32]float64)
  28. // /pji_control_pub
  29. numCountPjiControlCommandOfPjControlPub int
  30. egoSteeringCmdOfPjControlPub []float64
  31. egoThrottleCmdOfPjControlPub []float64
  32. // /data_read
  33. numCountDataReadOfDataRead int
  34. egoSteeringRealOfDataRead []float64
  35. egoThrottleRealOfDataRead []float64
  36. // --------------------------------------------------
  37. shareVars = new(sync.Map)
  38. saveTimeWindowMutex sync.Mutex // 保存时间窗口需要锁,防止数据竟态
  39. latestTimeWindowEnd = util.GetTimeCustom(time.Now())
  40. triggerInterval = 3.0 // 每个触发器3秒触发一次
  41. )
  42. // 负责监听所有主题并修改时间窗口
  43. func ProduceWindow() {
  44. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  45. var err error
  46. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  47. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  48. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  49. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  50. for i, topic := range commonConfig.SubscribeTopics {
  51. for {
  52. // 定时器,区别于订阅者
  53. if topic == pjisuv_ticker.TickerTopic {
  54. // 1 把所有触发器函数执行一遍,触发器内部创建额外的定时任务goroutine
  55. for _, f := range masterConfig.RuleOfCicvTicker {
  56. f(shareVars)
  57. }
  58. // 2 创建goroutine接收定时任务触发器返回的Label和Time,执行触发逻辑
  59. go func() {
  60. for {
  61. time.Sleep(time.Duration(1)) // 降低循环频率
  62. select {
  63. case tickInfo := <-pjisuv_ticker.TickerChan:
  64. faultLabel := tickInfo.FaultLabel
  65. faultHappenTime := tickInfo.FaultHappenTime
  66. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  67. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  68. default:
  69. }
  70. }
  71. }()
  72. }
  73. // 其他常规监听器
  74. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  75. // 1
  76. if topic == masterConfig.TopicOfAmrPose && (len(masterConfig.RuleOfAmrPose1) > 0 || len(masterConfig.RuleOfAmrPose3) > 0) {
  77. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  78. Node: commonConfig.RosNode,
  79. Topic: topic,
  80. Callback: func(data *visualization_msgs.MarkerArray) {
  81. subscribersTimeMutexes[i].Lock()
  82. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  83. subscribersMutexes[i].Lock()
  84. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  85. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  86. faultLabel := ""
  87. if len(masterConfig.RuleOfAmrPose1) > 0 {
  88. for _, f := range masterConfig.RuleOfAmrPose1 {
  89. faultLabel = f(data)
  90. if faultLabel != "" {
  91. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  92. subscribersTimes[i] = time.Now()
  93. goto TriggerSuccess
  94. }
  95. }
  96. }
  97. if len(masterConfig.RuleOfAmrPose3) > 0 {
  98. for _, f := range masterConfig.RuleOfAmrPose3 {
  99. faultLabel = f(shareVars, data)
  100. if faultLabel != "" {
  101. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  102. subscribersTimes[i] = time.Now()
  103. goto TriggerSuccess
  104. }
  105. }
  106. }
  107. TriggerSuccess:
  108. subscribersMutexes[i].Unlock()
  109. }
  110. subscribersTimeMutexes[i].Unlock()
  111. },
  112. })
  113. }
  114. // 2
  115. if topic == masterConfig.TopicOfBoundingBoxesFast &&
  116. (len(masterConfig.RuleOfBoundingBoxesFast1) > 0 ||
  117. len(masterConfig.RuleOfBoundingBoxesFast3) > 0) {
  118. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  119. Node: commonConfig.RosNode,
  120. Topic: topic,
  121. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  122. subscribersTimeMutexes[i].Lock()
  123. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  124. subscribersMutexes[i].Lock()
  125. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  126. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  127. faultLabel := ""
  128. if len(masterConfig.RuleOfBoundingBoxesFast1) > 0 {
  129. for _, f := range masterConfig.RuleOfBoundingBoxesFast1 {
  130. faultLabel = f(data)
  131. if faultLabel != "" {
  132. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  133. subscribersTimes[i] = time.Now()
  134. goto TriggerSuccess
  135. }
  136. }
  137. }
  138. if len(masterConfig.RuleOfBoundingBoxesFast3) > 0 {
  139. for _, f := range masterConfig.RuleOfBoundingBoxesFast3 {
  140. faultLabel = f(shareVars, data)
  141. if faultLabel != "" {
  142. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  143. subscribersTimes[i] = time.Now()
  144. goto TriggerSuccess
  145. }
  146. }
  147. }
  148. TriggerSuccess:
  149. subscribersMutexes[i].Unlock()
  150. }
  151. subscribersTimeMutexes[i].Unlock()
  152. },
  153. })
  154. }
  155. // 3
  156. if topic == masterConfig.TopicOfCameraFault &&
  157. (len(masterConfig.RuleOfCameraFault1) > 0 ||
  158. len(masterConfig.RuleOfCameraFault3) > 0) {
  159. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  160. Node: commonConfig.RosNode,
  161. Topic: topic,
  162. Callback: func(data *pjisuv_msgs.FaultVec) {
  163. subscribersTimeMutexes[i].Lock()
  164. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  165. subscribersMutexes[i].Lock()
  166. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  167. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  168. faultLabel := ""
  169. if len(masterConfig.RuleOfCameraFault1) > 0 {
  170. for _, f := range masterConfig.RuleOfCameraFault1 {
  171. faultLabel = f(data)
  172. if faultLabel != "" {
  173. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  174. subscribersTimes[i] = time.Now()
  175. goto TriggerSuccess
  176. }
  177. }
  178. }
  179. if len(masterConfig.RuleOfCameraFault3) > 0 {
  180. for _, f := range masterConfig.RuleOfCameraFault3 {
  181. faultLabel = f(shareVars, data)
  182. if faultLabel != "" {
  183. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  184. subscribersTimes[i] = time.Now()
  185. goto TriggerSuccess
  186. }
  187. }
  188. }
  189. TriggerSuccess:
  190. subscribersMutexes[i].Unlock()
  191. }
  192. subscribersTimeMutexes[i].Unlock()
  193. },
  194. })
  195. }
  196. // 4
  197. if topic == masterConfig.TopicOfCanData &&
  198. (len(masterConfig.RuleOfCanData1) > 0 ||
  199. len(masterConfig.RuleOfCanData3) > 0) {
  200. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  201. Node: commonConfig.RosNode,
  202. Topic: topic,
  203. Callback: func(data *pjisuv_msgs.Frame) {
  204. subscribersTimeMutexes[i].Lock()
  205. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  206. subscribersMutexes[i].Lock()
  207. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  208. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  209. faultLabel := ""
  210. if len(masterConfig.RuleOfCanData1) > 0 {
  211. for _, f := range masterConfig.RuleOfCanData1 {
  212. faultLabel = f(data)
  213. if faultLabel != "" {
  214. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  215. subscribersTimes[i] = time.Now()
  216. goto TriggerSuccess
  217. }
  218. }
  219. }
  220. if len(masterConfig.RuleOfCanData3) > 0 {
  221. for _, f := range masterConfig.RuleOfCanData3 {
  222. faultLabel = f(shareVars, data)
  223. if faultLabel != "" {
  224. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  225. subscribersTimes[i] = time.Now()
  226. goto TriggerSuccess
  227. }
  228. }
  229. }
  230. TriggerSuccess:
  231. subscribersMutexes[i].Unlock()
  232. }
  233. subscribersTimeMutexes[i].Unlock()
  234. },
  235. })
  236. }
  237. // 5
  238. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud &&
  239. (len(masterConfig.RuleOfCh128x1LslidarPointCloud1) > 0 ||
  240. len(masterConfig.RuleOfCh128x1LslidarPointCloud3) > 0) {
  241. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  242. Node: commonConfig.RosNode,
  243. Topic: topic,
  244. Callback: func(data *sensor_msgs.PointCloud2) {
  245. subscribersTimeMutexes[i].Lock()
  246. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  247. subscribersMutexes[i].Lock()
  248. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  249. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  250. faultLabel := ""
  251. if len(masterConfig.RuleOfCh128x1LslidarPointCloud1) > 0 {
  252. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud1 {
  253. faultLabel = f(data)
  254. if faultLabel != "" {
  255. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  256. subscribersTimes[i] = time.Now()
  257. goto TriggerSuccess
  258. }
  259. }
  260. }
  261. if len(masterConfig.RuleOfCh128x1LslidarPointCloud3) > 0 {
  262. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud3 {
  263. faultLabel = f(shareVars, data)
  264. if faultLabel != "" {
  265. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  266. subscribersTimes[i] = time.Now()
  267. goto TriggerSuccess
  268. }
  269. }
  270. }
  271. TriggerSuccess:
  272. subscribersMutexes[i].Unlock()
  273. }
  274. subscribersTimeMutexes[i].Unlock()
  275. },
  276. })
  277. }
  278. // 6
  279. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud &&
  280. (len(masterConfig.RuleOfCh64wLLslidarPointCloud1) > 0 ||
  281. len(masterConfig.RuleOfCh64wLLslidarPointCloud3) > 0) {
  282. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  283. Node: commonConfig.RosNode,
  284. Topic: topic,
  285. Callback: func(data *sensor_msgs.PointCloud2) {
  286. subscribersTimeMutexes[i].Lock()
  287. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  288. subscribersMutexes[i].Lock()
  289. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  290. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  291. faultLabel := ""
  292. if len(masterConfig.RuleOfCh64wLLslidarPointCloud1) > 0 {
  293. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud1 {
  294. faultLabel = f(data)
  295. if faultLabel != "" {
  296. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  297. subscribersTimes[i] = time.Now()
  298. goto TriggerSuccess
  299. }
  300. }
  301. }
  302. if len(masterConfig.RuleOfCh64wLLslidarPointCloud3) > 0 {
  303. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud3 {
  304. faultLabel = f(shareVars, data)
  305. if faultLabel != "" {
  306. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  307. subscribersTimes[i] = time.Now()
  308. goto TriggerSuccess
  309. }
  310. }
  311. }
  312. TriggerSuccess:
  313. subscribersMutexes[i].Unlock()
  314. }
  315. subscribersTimeMutexes[i].Unlock()
  316. },
  317. })
  318. }
  319. // 7
  320. if topic == masterConfig.TopicOfCh64wLScan &&
  321. (len(masterConfig.RuleOfCh64wLScan1) > 0 ||
  322. len(masterConfig.RuleOfCh64wLScan3) > 0) {
  323. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  324. Node: commonConfig.RosNode,
  325. Topic: topic,
  326. Callback: func(data *sensor_msgs.LaserScan) {
  327. subscribersTimeMutexes[i].Lock()
  328. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  329. subscribersMutexes[i].Lock()
  330. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  331. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  332. faultLabel := ""
  333. if len(masterConfig.RuleOfCh64wLScan1) > 0 {
  334. for _, f := range masterConfig.RuleOfCh64wLScan1 {
  335. faultLabel = f(data)
  336. if faultLabel != "" {
  337. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  338. subscribersTimes[i] = time.Now()
  339. goto TriggerSuccess
  340. }
  341. }
  342. }
  343. if len(masterConfig.RuleOfCh64wLScan3) > 0 {
  344. for _, f := range masterConfig.RuleOfCh64wLScan3 {
  345. faultLabel = f(shareVars, data)
  346. if faultLabel != "" {
  347. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  348. subscribersTimes[i] = time.Now()
  349. goto TriggerSuccess
  350. }
  351. }
  352. }
  353. TriggerSuccess:
  354. subscribersMutexes[i].Unlock()
  355. }
  356. subscribersTimeMutexes[i].Unlock()
  357. },
  358. })
  359. }
  360. // 8
  361. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud &&
  362. (len(masterConfig.RuleOfCh64wRLslidarPointCloud1) > 0 ||
  363. len(masterConfig.RuleOfCh64wRLslidarPointCloud3) > 0) {
  364. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  365. Node: commonConfig.RosNode,
  366. Topic: topic,
  367. Callback: func(data *sensor_msgs.PointCloud2) {
  368. subscribersTimeMutexes[i].Lock()
  369. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  370. subscribersMutexes[i].Lock()
  371. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  372. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  373. faultLabel := ""
  374. if len(masterConfig.RuleOfCh64wRLslidarPointCloud1) > 0 {
  375. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud1 {
  376. faultLabel = f(data)
  377. if faultLabel != "" {
  378. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  379. subscribersTimes[i] = time.Now()
  380. goto TriggerSuccess
  381. }
  382. }
  383. }
  384. if len(masterConfig.RuleOfCh64wRLslidarPointCloud3) > 0 {
  385. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud3 {
  386. faultLabel = f(shareVars, data)
  387. if faultLabel != "" {
  388. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  389. subscribersTimes[i] = time.Now()
  390. goto TriggerSuccess
  391. }
  392. }
  393. }
  394. TriggerSuccess:
  395. subscribersMutexes[i].Unlock()
  396. }
  397. subscribersTimeMutexes[i].Unlock()
  398. },
  399. })
  400. }
  401. // 9
  402. if topic == masterConfig.TopicOfCh64wRScan &&
  403. (len(masterConfig.RuleOfCh64wRScan1) > 0 ||
  404. len(masterConfig.RuleOfCh64wRScan3) > 0) {
  405. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  406. Node: commonConfig.RosNode,
  407. Topic: topic,
  408. Callback: func(data *sensor_msgs.LaserScan) {
  409. subscribersTimeMutexes[i].Lock()
  410. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  411. subscribersMutexes[i].Lock()
  412. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  413. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  414. faultLabel := ""
  415. if len(masterConfig.RuleOfCh64wRScan1) > 0 {
  416. for _, f := range masterConfig.RuleOfCh64wRScan1 {
  417. faultLabel = f(data)
  418. if faultLabel != "" {
  419. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  420. subscribersTimes[i] = time.Now()
  421. goto TriggerSuccess
  422. }
  423. }
  424. }
  425. if len(masterConfig.RuleOfCh64wRScan3) > 0 {
  426. for _, f := range masterConfig.RuleOfCh64wRScan3 {
  427. faultLabel = f(shareVars, data)
  428. if faultLabel != "" {
  429. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  430. subscribersTimes[i] = time.Now()
  431. goto TriggerSuccess
  432. }
  433. }
  434. }
  435. TriggerSuccess:
  436. subscribersMutexes[i].Unlock()
  437. }
  438. subscribersTimeMutexes[i].Unlock()
  439. },
  440. })
  441. }
  442. // 10
  443. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects &&
  444. (len(masterConfig.RuleOfCicvLidarclusterMovingObjects1) > 0 ||
  445. len(masterConfig.RuleOfCicvLidarclusterMovingObjects3) > 0) {
  446. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  447. Node: commonConfig.RosNode,
  448. Topic: topic,
  449. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  450. subscribersTimeMutexes[i].Lock()
  451. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  452. subscribersMutexes[i].Lock()
  453. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  454. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  455. faultLabel := ""
  456. if len(masterConfig.RuleOfCicvLidarclusterMovingObjects1) > 0 {
  457. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects1 {
  458. faultLabel = f(data)
  459. if faultLabel != "" {
  460. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  461. subscribersTimes[i] = time.Now()
  462. goto TriggerSuccess
  463. }
  464. }
  465. }
  466. if len(masterConfig.RuleOfCicvLidarclusterMovingObjects3) > 0 {
  467. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects3 {
  468. faultLabel = f(shareVars, data)
  469. if faultLabel != "" {
  470. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  471. subscribersTimes[i] = time.Now()
  472. goto TriggerSuccess
  473. }
  474. }
  475. }
  476. TriggerSuccess:
  477. subscribersMutexes[i].Unlock()
  478. }
  479. subscribersTimeMutexes[i].Unlock()
  480. },
  481. })
  482. }
  483. // 11 有共享变量的订阅者必须被创建
  484. if topic == masterConfig.TopicOfCicvAmrTrajectory {
  485. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  486. Node: commonConfig.RosNode,
  487. Topic: topic,
  488. Callback: func(data *pjisuv_msgs.Trajectory) {
  489. subscribersTimeMutexes[i].Lock()
  490. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  491. subscribersMutexes[i].Lock()
  492. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  493. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  494. faultLabel := ""
  495. if len(masterConfig.RuleOfCicvAmrTrajectory1) > 0 {
  496. for _, f := range masterConfig.RuleOfCicvAmrTrajectory1 {
  497. faultLabel = f(data)
  498. if faultLabel != "" {
  499. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  500. subscribersTimes[i] = time.Now()
  501. goto TriggerSuccess
  502. }
  503. }
  504. }
  505. if len(masterConfig.RuleOfCicvAmrTrajectory3) > 0 {
  506. for _, f := range masterConfig.RuleOfCicvAmrTrajectory3 {
  507. faultLabel = f(shareVars, data)
  508. if faultLabel != "" {
  509. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  510. subscribersTimes[i] = time.Now()
  511. goto TriggerSuccess
  512. }
  513. }
  514. }
  515. TriggerSuccess:
  516. subscribersMutexes[i].Unlock()
  517. }
  518. subscribersTimeMutexes[i].Unlock()
  519. // 更新共享变量
  520. currentCurvateres := make([]float64, 0)
  521. for _, point := range data.Trajectoryinfo.Trajectorypoints {
  522. currentCurvateres = append(currentCurvateres, math.Abs(float64(point.Curvature)))
  523. }
  524. shareVars.Store("LastCurvaturesOfCicvAmrTrajectory", currentCurvateres)
  525. shareVars.Store("DecisionType", data.Trajectoryinfo.DecisionType)
  526. },
  527. })
  528. }
  529. // 12 有共享变量的订阅者必须被创建
  530. if topic == masterConfig.TopicOfCicvLocation {
  531. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  532. Node: commonConfig.RosNode,
  533. Topic: topic,
  534. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  535. subscribersTimeMutexes[i].Lock()
  536. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  537. subscribersMutexes[i].Lock()
  538. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  539. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  540. faultLabel := ""
  541. if len(masterConfig.RuleOfCicvLocation1) > 0 {
  542. for _, f := range masterConfig.RuleOfCicvLocation1 {
  543. faultLabel = f(data)
  544. if faultLabel != "" {
  545. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  546. subscribersTimes[i] = time.Now()
  547. goto TriggerSuccess
  548. }
  549. }
  550. }
  551. if len(masterConfig.RuleOfCicvLocation3) > 0 {
  552. for _, f := range masterConfig.RuleOfCicvLocation3 {
  553. faultLabel = f(shareVars, data)
  554. if faultLabel != "" {
  555. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  556. subscribersTimes[i] = time.Now()
  557. goto TriggerSuccess
  558. }
  559. }
  560. }
  561. TriggerSuccess:
  562. subscribersMutexes[i].Unlock()
  563. }
  564. subscribersTimeMutexes[i].Unlock()
  565. // 更新共享变量
  566. shareVars.Store("VelocityXOfCicvLocation", data.VelocityX)
  567. shareVars.Store("VelocityYOfCicvLocation", data.VelocityY)
  568. shareVars.Store("VelocityZOfCicvLocation", data.VelocityZ)
  569. shareVars.Store("YawOfCicvLocation", data.Yaw)
  570. shareVars.Store("AngularVelocityZOfCicvLocation", data.AngularVelocityZ)
  571. shareVars.Store("PositionXOfCicvLocation", data.PositionX)
  572. shareVars.Store("PositionYOfCicvLocation", data.PositionY)
  573. // 用于判断是否在车间内
  574. if time.Since(cicvLocationTime).Seconds() > 1 {
  575. shareVars.Store("key", "value")
  576. cicvLocationTime = time.Now()
  577. }
  578. },
  579. })
  580. }
  581. // 13
  582. if topic == masterConfig.TopicOfCloudClusters &&
  583. (len(masterConfig.RuleOfCloudClusters1) > 0 ||
  584. len(masterConfig.RuleOfCloudClusters3) > 0) {
  585. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  586. Node: commonConfig.RosNode,
  587. Topic: topic,
  588. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  589. subscribersTimeMutexes[i].Lock()
  590. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  591. subscribersMutexes[i].Lock()
  592. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  593. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  594. faultLabel := ""
  595. if len(masterConfig.RuleOfCloudClusters1) > 0 {
  596. for _, f := range masterConfig.RuleOfCloudClusters1 {
  597. faultLabel = f(data)
  598. if faultLabel != "" {
  599. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  600. subscribersTimes[i] = time.Now()
  601. goto TriggerSuccess
  602. }
  603. }
  604. }
  605. if len(masterConfig.RuleOfCloudClusters3) > 0 {
  606. for _, f := range masterConfig.RuleOfCloudClusters3 {
  607. faultLabel = f(shareVars, data)
  608. if faultLabel != "" {
  609. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  610. subscribersTimes[i] = time.Now()
  611. goto TriggerSuccess
  612. }
  613. }
  614. }
  615. TriggerSuccess:
  616. subscribersMutexes[i].Unlock()
  617. }
  618. subscribersTimeMutexes[i].Unlock()
  619. },
  620. })
  621. }
  622. // 14
  623. if topic == masterConfig.TopicOfHeartbeatInfo &&
  624. (len(masterConfig.RuleOfHeartbeatInfo1) > 0 ||
  625. len(masterConfig.RuleOfHeartbeatInfo3) > 0) {
  626. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  627. Node: commonConfig.RosNode,
  628. Topic: topic,
  629. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  630. subscribersTimeMutexes[i].Lock()
  631. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  632. subscribersMutexes[i].Lock()
  633. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  634. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  635. faultLabel := ""
  636. if len(masterConfig.RuleOfHeartbeatInfo1) > 0 {
  637. for _, f := range masterConfig.RuleOfHeartbeatInfo1 {
  638. faultLabel = f(data)
  639. if faultLabel != "" {
  640. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  641. subscribersTimes[i] = time.Now()
  642. goto TriggerSuccess
  643. }
  644. }
  645. }
  646. if len(masterConfig.RuleOfHeartbeatInfo3) > 0 {
  647. for _, f := range masterConfig.RuleOfHeartbeatInfo3 {
  648. faultLabel = f(shareVars, data)
  649. if faultLabel != "" {
  650. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  651. subscribersTimes[i] = time.Now()
  652. goto TriggerSuccess
  653. }
  654. }
  655. }
  656. TriggerSuccess:
  657. subscribersMutexes[i].Unlock()
  658. }
  659. subscribersTimeMutexes[i].Unlock()
  660. },
  661. })
  662. }
  663. // 15
  664. if topic == masterConfig.TopicOfLidarPretreatmentCost &&
  665. (len(masterConfig.RuleOfLidarPretreatmentCost1) > 0 ||
  666. len(masterConfig.RuleOfLidarPretreatmentCost3) > 0) {
  667. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  668. Node: commonConfig.RosNode,
  669. Topic: topic,
  670. Callback: func(data *geometry_msgs.Vector3Stamped) {
  671. subscribersTimeMutexes[i].Lock()
  672. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  673. subscribersMutexes[i].Lock()
  674. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  675. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  676. faultLabel := ""
  677. if len(masterConfig.RuleOfLidarPretreatmentCost1) > 0 {
  678. for _, f := range masterConfig.RuleOfLidarPretreatmentCost1 {
  679. faultLabel = f(data)
  680. if faultLabel != "" {
  681. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  682. subscribersTimes[i] = time.Now()
  683. goto TriggerSuccess
  684. }
  685. }
  686. }
  687. if len(masterConfig.RuleOfLidarPretreatmentCost3) > 0 {
  688. for _, f := range masterConfig.RuleOfLidarPretreatmentCost3 {
  689. faultLabel = f(shareVars, data)
  690. if faultLabel != "" {
  691. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  692. subscribersTimes[i] = time.Now()
  693. goto TriggerSuccess
  694. }
  695. }
  696. }
  697. TriggerSuccess:
  698. subscribersMutexes[i].Unlock()
  699. }
  700. subscribersTimeMutexes[i].Unlock()
  701. },
  702. })
  703. }
  704. // 16
  705. if topic == masterConfig.TopicOfLidarPretreatmentOdometry &&
  706. (len(masterConfig.RuleOfLidarPretreatmentOdometry1) > 0 ||
  707. len(masterConfig.RuleOfLidarPretreatmentOdometry3) > 0) {
  708. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  709. Node: commonConfig.RosNode,
  710. Topic: topic,
  711. Callback: func(data *nav_msgs.Odometry) {
  712. subscribersTimeMutexes[i].Lock()
  713. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  714. subscribersMutexes[i].Lock()
  715. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  716. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  717. faultLabel := ""
  718. if len(masterConfig.RuleOfLidarPretreatmentOdometry1) > 0 {
  719. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry1 {
  720. faultLabel = f(data)
  721. if faultLabel != "" {
  722. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  723. subscribersTimes[i] = time.Now()
  724. goto TriggerSuccess
  725. }
  726. }
  727. }
  728. if len(masterConfig.RuleOfLidarPretreatmentOdometry3) > 0 {
  729. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry3 {
  730. faultLabel = f(shareVars, data)
  731. if faultLabel != "" {
  732. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  733. subscribersTimes[i] = time.Now()
  734. goto TriggerSuccess
  735. }
  736. }
  737. }
  738. TriggerSuccess:
  739. subscribersMutexes[i].Unlock()
  740. }
  741. subscribersTimeMutexes[i].Unlock()
  742. },
  743. })
  744. }
  745. // 17
  746. if topic == masterConfig.TopicOfLidarRoi &&
  747. (len(masterConfig.RuleOfLidarRoi1) > 0 ||
  748. len(masterConfig.RuleOfLidarRoi3) > 0) {
  749. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  750. Node: commonConfig.RosNode,
  751. Topic: topic,
  752. Callback: func(data *geometry_msgs.PolygonStamped) {
  753. subscribersTimeMutexes[i].Lock()
  754. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  755. subscribersMutexes[i].Lock()
  756. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  757. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  758. faultLabel := ""
  759. if len(masterConfig.RuleOfLidarRoi1) > 0 {
  760. for _, f := range masterConfig.RuleOfLidarRoi1 {
  761. faultLabel = f(data)
  762. if faultLabel != "" {
  763. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  764. subscribersTimes[i] = time.Now()
  765. goto TriggerSuccess
  766. }
  767. }
  768. }
  769. if len(masterConfig.RuleOfLidarRoi3) > 0 {
  770. for _, f := range masterConfig.RuleOfLidarRoi3 {
  771. faultLabel = f(shareVars, data)
  772. if faultLabel != "" {
  773. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  774. subscribersTimes[i] = time.Now()
  775. goto TriggerSuccess
  776. }
  777. }
  778. }
  779. TriggerSuccess:
  780. subscribersMutexes[i].Unlock()
  781. }
  782. subscribersTimeMutexes[i].Unlock()
  783. },
  784. })
  785. }
  786. // 18
  787. if topic == masterConfig.TopicOfLine1 &&
  788. (len(masterConfig.RuleOfLine11) > 0 ||
  789. len(masterConfig.RuleOfLine13) > 0) {
  790. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  791. Node: commonConfig.RosNode,
  792. Topic: topic,
  793. Callback: func(data *nav_msgs.Path) {
  794. subscribersTimeMutexes[i].Lock()
  795. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  796. subscribersMutexes[i].Lock()
  797. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  798. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  799. faultLabel := ""
  800. if len(masterConfig.RuleOfLine11) > 0 {
  801. for _, f := range masterConfig.RuleOfLine11 {
  802. faultLabel = f(data)
  803. if faultLabel != "" {
  804. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  805. subscribersTimes[i] = time.Now()
  806. goto TriggerSuccess
  807. }
  808. }
  809. }
  810. if len(masterConfig.RuleOfLine13) > 0 {
  811. for _, f := range masterConfig.RuleOfLine13 {
  812. faultLabel = f(shareVars, data)
  813. if faultLabel != "" {
  814. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  815. subscribersTimes[i] = time.Now()
  816. goto TriggerSuccess
  817. }
  818. }
  819. }
  820. TriggerSuccess:
  821. subscribersMutexes[i].Unlock()
  822. }
  823. subscribersTimeMutexes[i].Unlock()
  824. },
  825. })
  826. }
  827. // 19
  828. if topic == masterConfig.TopicOfLine2 &&
  829. (len(masterConfig.RuleOfLine21) > 0 ||
  830. len(masterConfig.RuleOfLine23) > 0) {
  831. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  832. Node: commonConfig.RosNode,
  833. Topic: topic,
  834. Callback: func(data *nav_msgs.Path) {
  835. subscribersTimeMutexes[i].Lock()
  836. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  837. subscribersMutexes[i].Lock()
  838. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  839. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  840. faultLabel := ""
  841. if len(masterConfig.RuleOfLine21) > 0 {
  842. for _, f := range masterConfig.RuleOfLine21 {
  843. faultLabel = f(data)
  844. if faultLabel != "" {
  845. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  846. subscribersTimes[i] = time.Now()
  847. goto TriggerSuccess
  848. }
  849. }
  850. }
  851. if len(masterConfig.RuleOfLine23) > 0 {
  852. for _, f := range masterConfig.RuleOfLine23 {
  853. faultLabel = f(shareVars, data)
  854. if faultLabel != "" {
  855. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  856. subscribersTimes[i] = time.Now()
  857. goto TriggerSuccess
  858. }
  859. }
  860. }
  861. TriggerSuccess:
  862. subscribersMutexes[i].Unlock()
  863. }
  864. subscribersTimeMutexes[i].Unlock()
  865. },
  866. })
  867. }
  868. // 20
  869. if topic == masterConfig.TopicOfMapPolygon &&
  870. (len(masterConfig.RuleOfMapPolygon1) > 0 ||
  871. len(masterConfig.RuleOfMapPolygon3) > 0) {
  872. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  873. Node: commonConfig.RosNode,
  874. Topic: topic,
  875. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  876. subscribersTimeMutexes[i].Lock()
  877. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  878. subscribersMutexes[i].Lock()
  879. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  880. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  881. faultLabel := ""
  882. if len(masterConfig.RuleOfMapPolygon1) > 0 {
  883. for _, f := range masterConfig.RuleOfMapPolygon1 {
  884. faultLabel = f(data)
  885. if faultLabel != "" {
  886. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  887. subscribersTimes[i] = time.Now()
  888. goto TriggerSuccess
  889. }
  890. }
  891. }
  892. if len(masterConfig.RuleOfMapPolygon3) > 0 {
  893. for _, f := range masterConfig.RuleOfMapPolygon3 {
  894. faultLabel = f(shareVars, data)
  895. if faultLabel != "" {
  896. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  897. subscribersTimes[i] = time.Now()
  898. goto TriggerSuccess
  899. }
  900. }
  901. }
  902. TriggerSuccess:
  903. subscribersMutexes[i].Unlock()
  904. }
  905. subscribersTimeMutexes[i].Unlock()
  906. },
  907. })
  908. }
  909. // 21
  910. if topic == masterConfig.TopicOfObstacleDisplay &&
  911. (len(masterConfig.RuleOfObstacleDisplay1) > 0 ||
  912. len(masterConfig.RuleOfObstacleDisplay3) > 0) {
  913. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  914. Node: commonConfig.RosNode,
  915. Topic: topic,
  916. Callback: func(data *visualization_msgs.MarkerArray) {
  917. subscribersTimeMutexes[i].Lock()
  918. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  919. subscribersMutexes[i].Lock()
  920. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  921. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  922. faultLabel := ""
  923. if len(masterConfig.RuleOfObstacleDisplay1) > 0 {
  924. for _, f := range masterConfig.RuleOfObstacleDisplay1 {
  925. faultLabel = f(data)
  926. if faultLabel != "" {
  927. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  928. subscribersTimes[i] = time.Now()
  929. goto TriggerSuccess
  930. }
  931. }
  932. }
  933. if len(masterConfig.RuleOfObstacleDisplay3) > 0 {
  934. for _, f := range masterConfig.RuleOfObstacleDisplay3 {
  935. faultLabel = f(shareVars, data)
  936. if faultLabel != "" {
  937. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  938. subscribersTimes[i] = time.Now()
  939. goto TriggerSuccess
  940. }
  941. }
  942. }
  943. TriggerSuccess:
  944. subscribersMutexes[i].Unlock()
  945. }
  946. subscribersTimeMutexes[i].Unlock()
  947. },
  948. })
  949. }
  950. // 22 有共享变量的订阅者必须被创建
  951. if topic == masterConfig.TopicOfPjControlPub {
  952. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  953. Node: commonConfig.RosNode,
  954. Topic: topic,
  955. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  956. subscribersTimeMutexes[i].Lock()
  957. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  958. subscribersMutexes[i].Lock()
  959. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  960. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  961. faultLabel := ""
  962. if len(masterConfig.RuleOfPjControlPub1) > 0 {
  963. for _, f := range masterConfig.RuleOfPjControlPub1 {
  964. faultLabel = f(data)
  965. if faultLabel != "" {
  966. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  967. subscribersTimes[i] = time.Now()
  968. goto TriggerSuccess
  969. }
  970. }
  971. }
  972. if len(masterConfig.RuleOfPjControlPub3) > 0 {
  973. for _, f := range masterConfig.RuleOfPjControlPub3 {
  974. faultLabel = f(shareVars, data)
  975. if faultLabel != "" {
  976. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  977. subscribersTimes[i] = time.Now()
  978. goto TriggerSuccess
  979. }
  980. }
  981. }
  982. TriggerSuccess:
  983. subscribersMutexes[i].Unlock()
  984. }
  985. subscribersTimeMutexes[i].Unlock()
  986. // 更新共享变量
  987. numCountPjiControlCommandOfPjControlPub++
  988. if numCountPjiControlCommandOfPjControlPub == 10 {
  989. egoSteeringCmdOfPjControlPub = append(egoSteeringCmdOfPjControlPub, data.ICPVCmdStrAngle)
  990. egoThrottleCmdOfPjControlPub = append(egoThrottleCmdOfPjControlPub, data.ICPVCmdAccPelPosAct)
  991. numCountPjiControlCommandOfPjControlPub = 0
  992. }
  993. shareVars.Store("NumCountPjiControlCommandOfPjControlPub", numCountPjiControlCommandOfPjControlPub)
  994. shareVars.Store("EgoSteeringCmdOfPjControlPub", egoSteeringCmdOfPjControlPub)
  995. shareVars.Store("EgoThrottleCmdOfPjControlPub", egoThrottleCmdOfPjControlPub)
  996. },
  997. })
  998. }
  999. // 23
  1000. if topic == masterConfig.TopicOfPointsCluster &&
  1001. (len(masterConfig.RuleOfPointsCluster1) > 0 ||
  1002. len(masterConfig.RuleOfPointsCluster3) > 0) {
  1003. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1004. Node: commonConfig.RosNode,
  1005. Topic: topic,
  1006. Callback: func(data *sensor_msgs.PointCloud2) {
  1007. subscribersTimeMutexes[i].Lock()
  1008. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1009. subscribersMutexes[i].Lock()
  1010. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1011. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1012. faultLabel := ""
  1013. if len(masterConfig.RuleOfPointsCluster1) > 0 {
  1014. for _, f := range masterConfig.RuleOfPointsCluster1 {
  1015. faultLabel = f(data)
  1016. if faultLabel != "" {
  1017. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1018. subscribersTimes[i] = time.Now()
  1019. goto TriggerSuccess
  1020. }
  1021. }
  1022. }
  1023. if len(masterConfig.RuleOfPointsCluster3) > 0 {
  1024. for _, f := range masterConfig.RuleOfPointsCluster3 {
  1025. faultLabel = f(shareVars, data)
  1026. if faultLabel != "" {
  1027. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1028. subscribersTimes[i] = time.Now()
  1029. goto TriggerSuccess
  1030. }
  1031. }
  1032. }
  1033. TriggerSuccess:
  1034. subscribersMutexes[i].Unlock()
  1035. }
  1036. subscribersTimeMutexes[i].Unlock()
  1037. },
  1038. })
  1039. }
  1040. // 24
  1041. if topic == masterConfig.TopicOfPointsConcat &&
  1042. (len(masterConfig.RuleOfPointsConcat1) > 0 ||
  1043. len(masterConfig.RuleOfPointsConcat3) > 0) {
  1044. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1045. Node: commonConfig.RosNode,
  1046. Topic: topic,
  1047. Callback: func(data *sensor_msgs.PointCloud2) {
  1048. subscribersTimeMutexes[i].Lock()
  1049. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1050. subscribersMutexes[i].Lock()
  1051. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1052. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1053. faultLabel := ""
  1054. if len(masterConfig.RuleOfPointsConcat1) > 0 {
  1055. for _, f := range masterConfig.RuleOfPointsConcat1 {
  1056. faultLabel = f(data)
  1057. if faultLabel != "" {
  1058. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1059. subscribersTimes[i] = time.Now()
  1060. goto TriggerSuccess
  1061. }
  1062. }
  1063. }
  1064. if len(masterConfig.RuleOfPointsConcat3) > 0 {
  1065. for _, f := range masterConfig.RuleOfPointsConcat3 {
  1066. faultLabel = f(shareVars, data)
  1067. if faultLabel != "" {
  1068. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1069. subscribersTimes[i] = time.Now()
  1070. goto TriggerSuccess
  1071. }
  1072. }
  1073. }
  1074. TriggerSuccess:
  1075. subscribersMutexes[i].Unlock()
  1076. }
  1077. subscribersTimeMutexes[i].Unlock()
  1078. },
  1079. })
  1080. }
  1081. // 25
  1082. if topic == masterConfig.TopicOfReferenceDisplay &&
  1083. (len(masterConfig.RuleOfReferenceDisplay1) > 0 ||
  1084. len(masterConfig.RuleOfReferenceDisplay3) > 0) {
  1085. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1086. Node: commonConfig.RosNode,
  1087. Topic: topic,
  1088. Callback: func(data *nav_msgs.Path) {
  1089. subscribersTimeMutexes[i].Lock()
  1090. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1091. subscribersMutexes[i].Lock()
  1092. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1093. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1094. faultLabel := ""
  1095. if len(masterConfig.RuleOfReferenceDisplay1) > 0 {
  1096. for _, f := range masterConfig.RuleOfReferenceDisplay1 {
  1097. faultLabel = f(data)
  1098. if faultLabel != "" {
  1099. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1100. subscribersTimes[i] = time.Now()
  1101. goto TriggerSuccess
  1102. }
  1103. }
  1104. }
  1105. if len(masterConfig.RuleOfReferenceDisplay3) > 0 {
  1106. for _, f := range masterConfig.RuleOfReferenceDisplay3 {
  1107. faultLabel = f(shareVars, data)
  1108. if faultLabel != "" {
  1109. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1110. subscribersTimes[i] = time.Now()
  1111. goto TriggerSuccess
  1112. }
  1113. }
  1114. }
  1115. TriggerSuccess:
  1116. subscribersMutexes[i].Unlock()
  1117. }
  1118. subscribersTimeMutexes[i].Unlock()
  1119. },
  1120. })
  1121. }
  1122. // 26
  1123. if topic == masterConfig.TopicOfReferenceTrajectory &&
  1124. (len(masterConfig.RuleOfReferenceTrajectory1) > 0 ||
  1125. len(masterConfig.RuleOfReferenceTrajectory3) > 0) {
  1126. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1127. Node: commonConfig.RosNode,
  1128. Topic: topic,
  1129. Callback: func(data *pjisuv_msgs.Trajectory) {
  1130. subscribersTimeMutexes[i].Lock()
  1131. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1132. subscribersMutexes[i].Lock()
  1133. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1134. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1135. faultLabel := ""
  1136. if len(masterConfig.RuleOfReferenceTrajectory1) > 0 {
  1137. for _, f := range masterConfig.RuleOfReferenceTrajectory1 {
  1138. faultLabel = f(data)
  1139. if faultLabel != "" {
  1140. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1141. subscribersTimes[i] = time.Now()
  1142. goto TriggerSuccess
  1143. }
  1144. }
  1145. }
  1146. if len(masterConfig.RuleOfReferenceTrajectory3) > 0 {
  1147. for _, f := range masterConfig.RuleOfReferenceTrajectory3 {
  1148. faultLabel = f(shareVars, data)
  1149. if faultLabel != "" {
  1150. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1151. subscribersTimes[i] = time.Now()
  1152. goto TriggerSuccess
  1153. }
  1154. }
  1155. }
  1156. TriggerSuccess:
  1157. subscribersMutexes[i].Unlock()
  1158. }
  1159. subscribersTimeMutexes[i].Unlock()
  1160. },
  1161. })
  1162. }
  1163. // 27
  1164. if topic == masterConfig.TopicOfRoiPoints &&
  1165. (len(masterConfig.RuleOfRoiPoints1) > 0 ||
  1166. len(masterConfig.RuleOfRoiPoints3) > 0) {
  1167. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1168. Node: commonConfig.RosNode,
  1169. Topic: topic,
  1170. Callback: func(data *sensor_msgs.PointCloud2) {
  1171. subscribersTimeMutexes[i].Lock()
  1172. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1173. subscribersMutexes[i].Lock()
  1174. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1175. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1176. faultLabel := ""
  1177. if len(masterConfig.RuleOfRoiPoints1) > 0 {
  1178. for _, f := range masterConfig.RuleOfRoiPoints1 {
  1179. faultLabel = f(data)
  1180. if faultLabel != "" {
  1181. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1182. subscribersTimes[i] = time.Now()
  1183. goto TriggerSuccess
  1184. }
  1185. }
  1186. }
  1187. if len(masterConfig.RuleOfRoiPoints3) > 0 {
  1188. for _, f := range masterConfig.RuleOfRoiPoints3 {
  1189. faultLabel = f(shareVars, data)
  1190. if faultLabel != "" {
  1191. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1192. subscribersTimes[i] = time.Now()
  1193. goto TriggerSuccess
  1194. }
  1195. }
  1196. }
  1197. TriggerSuccess:
  1198. subscribersMutexes[i].Unlock()
  1199. }
  1200. subscribersTimeMutexes[i].Unlock()
  1201. },
  1202. })
  1203. }
  1204. // 28
  1205. if topic == masterConfig.TopicOfRoiPolygon &&
  1206. (len(masterConfig.RuleOfRoiPolygon1) > 0 ||
  1207. len(masterConfig.RuleOfRoiPolygon3) > 0) {
  1208. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1209. Node: commonConfig.RosNode,
  1210. Topic: topic,
  1211. Callback: func(data *nav_msgs.Path) {
  1212. subscribersTimeMutexes[i].Lock()
  1213. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1214. subscribersMutexes[i].Lock()
  1215. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1216. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1217. faultLabel := ""
  1218. if len(masterConfig.RuleOfRoiPolygon1) > 0 {
  1219. for _, f := range masterConfig.RuleOfRoiPolygon1 {
  1220. faultLabel = f(data)
  1221. if faultLabel != "" {
  1222. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1223. subscribersTimes[i] = time.Now()
  1224. goto TriggerSuccess
  1225. }
  1226. }
  1227. }
  1228. if len(masterConfig.RuleOfRoiPolygon3) > 0 {
  1229. for _, f := range masterConfig.RuleOfRoiPolygon3 {
  1230. faultLabel = f(shareVars, data)
  1231. if faultLabel != "" {
  1232. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1233. subscribersTimes[i] = time.Now()
  1234. goto TriggerSuccess
  1235. }
  1236. }
  1237. }
  1238. TriggerSuccess:
  1239. subscribersMutexes[i].Unlock()
  1240. }
  1241. subscribersTimeMutexes[i].Unlock()
  1242. },
  1243. })
  1244. }
  1245. // 29
  1246. if topic == masterConfig.TopicOfTf &&
  1247. (len(masterConfig.RuleOfTf1) > 0 ||
  1248. len(masterConfig.RuleOfTf3) > 0) {
  1249. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1250. Node: commonConfig.RosNode,
  1251. Topic: topic,
  1252. Callback: func(data *tf2_msgs.TFMessage) {
  1253. subscribersTimeMutexes[i].Lock()
  1254. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1255. subscribersMutexes[i].Lock()
  1256. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1257. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1258. faultLabel := ""
  1259. if len(masterConfig.RuleOfTf1) > 0 {
  1260. for _, f := range masterConfig.RuleOfTf1 {
  1261. faultLabel = f(data)
  1262. if faultLabel != "" {
  1263. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1264. subscribersTimes[i] = time.Now()
  1265. goto TriggerSuccess
  1266. }
  1267. }
  1268. }
  1269. if len(masterConfig.RuleOfTf3) > 0 {
  1270. for _, f := range masterConfig.RuleOfTf3 {
  1271. faultLabel = f(shareVars, data)
  1272. if faultLabel != "" {
  1273. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1274. subscribersTimes[i] = time.Now()
  1275. goto TriggerSuccess
  1276. }
  1277. }
  1278. }
  1279. TriggerSuccess:
  1280. subscribersMutexes[i].Unlock()
  1281. }
  1282. subscribersTimeMutexes[i].Unlock()
  1283. },
  1284. })
  1285. }
  1286. // 30 有共享变量的订阅者必须被创建
  1287. if topic == masterConfig.TopicOfTpperception {
  1288. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1289. Node: commonConfig.RosNode,
  1290. Topic: topic,
  1291. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  1292. subscribersTimeMutexes[i].Lock()
  1293. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1294. subscribersMutexes[i].Lock()
  1295. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1296. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1297. faultLabel := ""
  1298. if len(masterConfig.RuleOfTpperception1) > 0 {
  1299. for _, f := range masterConfig.RuleOfTpperception1 {
  1300. faultLabel = f(data)
  1301. if faultLabel != "" {
  1302. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1303. subscribersTimes[i] = time.Now()
  1304. goto TriggerSuccess
  1305. }
  1306. }
  1307. }
  1308. if len(masterConfig.RuleOfTpperception3) > 0 {
  1309. for _, f := range masterConfig.RuleOfTpperception3 {
  1310. faultLabel = f(shareVars, data)
  1311. if faultLabel != "" {
  1312. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1313. subscribersTimes[i] = time.Now()
  1314. goto TriggerSuccess
  1315. }
  1316. }
  1317. }
  1318. TriggerSuccess:
  1319. subscribersMutexes[i].Unlock()
  1320. }
  1321. subscribersTimeMutexes[i].Unlock()
  1322. // 更新共享变量
  1323. for _, obj := range data.Objs {
  1324. if obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  1325. continue
  1326. }
  1327. if _, ok := objDicOfTpperception[obj.Id]; !ok {
  1328. objDicOfTpperception[obj.Id] = []float32{}
  1329. }
  1330. objDicOfTpperception[obj.Id] = append(objDicOfTpperception[obj.Id], obj.Y)
  1331. objTypeDicOfTpperception[obj.Id] = obj.Type
  1332. objSpeedDicOfTpperception[obj.Id] = math.Pow(math.Pow(float64(obj.Vxabs), 2)+math.Pow(float64(obj.Vyabs), 2), 0.5)
  1333. }
  1334. shareVars.Store("ObjDicOfTpperception", objDicOfTpperception)
  1335. shareVars.Store("ObjTypeDicOfTpperception", objTypeDicOfTpperception)
  1336. shareVars.Store("ObjSpeedDicOfTpperception", objSpeedDicOfTpperception)
  1337. },
  1338. })
  1339. }
  1340. // 31
  1341. if topic == masterConfig.TopicOfTpperceptionVis &&
  1342. (len(masterConfig.RuleOfTpperceptionVis1) > 0 ||
  1343. len(masterConfig.RuleOfTpperceptionVis3) > 0) {
  1344. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1345. Node: commonConfig.RosNode,
  1346. Topic: topic,
  1347. Callback: func(data *visualization_msgs.MarkerArray) {
  1348. subscribersTimeMutexes[i].Lock()
  1349. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1350. subscribersMutexes[i].Lock()
  1351. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1352. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1353. faultLabel := ""
  1354. if len(masterConfig.RuleOfTpperceptionVis1) > 0 {
  1355. for _, f := range masterConfig.RuleOfTpperceptionVis1 {
  1356. faultLabel = f(data)
  1357. if faultLabel != "" {
  1358. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1359. subscribersTimes[i] = time.Now()
  1360. goto TriggerSuccess
  1361. }
  1362. }
  1363. }
  1364. if len(masterConfig.RuleOfTpperceptionVis3) > 0 {
  1365. for _, f := range masterConfig.RuleOfTpperceptionVis3 {
  1366. faultLabel = f(shareVars, data)
  1367. if faultLabel != "" {
  1368. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1369. subscribersTimes[i] = time.Now()
  1370. goto TriggerSuccess
  1371. }
  1372. }
  1373. }
  1374. TriggerSuccess:
  1375. subscribersMutexes[i].Unlock()
  1376. }
  1377. subscribersTimeMutexes[i].Unlock()
  1378. },
  1379. })
  1380. }
  1381. // 32
  1382. if topic == masterConfig.TopicOfTprouteplan &&
  1383. (len(masterConfig.RuleOfTprouteplan1) > 0 ||
  1384. len(masterConfig.RuleOfTprouteplan3) > 0) {
  1385. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1386. Node: commonConfig.RosNode,
  1387. Topic: topic,
  1388. Callback: func(data *pjisuv_msgs.RoutePlan) {
  1389. subscribersTimeMutexes[i].Lock()
  1390. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1391. subscribersMutexes[i].Lock()
  1392. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1393. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1394. faultLabel := ""
  1395. if len(masterConfig.RuleOfTprouteplan1) > 0 {
  1396. for _, f := range masterConfig.RuleOfTprouteplan1 {
  1397. faultLabel = f(data)
  1398. if faultLabel != "" {
  1399. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1400. subscribersTimes[i] = time.Now()
  1401. goto TriggerSuccess
  1402. }
  1403. }
  1404. }
  1405. if len(masterConfig.RuleOfTprouteplan3) > 0 {
  1406. for _, f := range masterConfig.RuleOfTprouteplan3 {
  1407. faultLabel = f(shareVars, data)
  1408. if faultLabel != "" {
  1409. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1410. subscribersTimes[i] = time.Now()
  1411. goto TriggerSuccess
  1412. }
  1413. }
  1414. }
  1415. TriggerSuccess:
  1416. subscribersMutexes[i].Unlock()
  1417. }
  1418. subscribersTimeMutexes[i].Unlock()
  1419. },
  1420. })
  1421. }
  1422. // 33
  1423. if topic == masterConfig.TopicOfTrajectoryDisplay &&
  1424. (len(masterConfig.RuleOfTrajectoryDisplay1) > 0 ||
  1425. len(masterConfig.RuleOfTrajectoryDisplay3) > 0) {
  1426. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1427. Node: commonConfig.RosNode,
  1428. Topic: topic,
  1429. Callback: func(data *nav_msgs.Path) {
  1430. subscribersTimeMutexes[i].Lock()
  1431. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1432. subscribersMutexes[i].Lock()
  1433. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1434. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1435. faultLabel := ""
  1436. if len(masterConfig.RuleOfTrajectoryDisplay1) > 0 {
  1437. for _, f := range masterConfig.RuleOfTrajectoryDisplay1 {
  1438. faultLabel = f(data)
  1439. if faultLabel != "" {
  1440. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1441. subscribersTimes[i] = time.Now()
  1442. goto TriggerSuccess
  1443. }
  1444. }
  1445. }
  1446. if len(masterConfig.RuleOfTrajectoryDisplay3) > 0 {
  1447. for _, f := range masterConfig.RuleOfTrajectoryDisplay3 {
  1448. faultLabel = f(shareVars, data)
  1449. if faultLabel != "" {
  1450. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1451. subscribersTimes[i] = time.Now()
  1452. goto TriggerSuccess
  1453. }
  1454. }
  1455. }
  1456. TriggerSuccess:
  1457. subscribersMutexes[i].Unlock()
  1458. }
  1459. subscribersTimeMutexes[i].Unlock()
  1460. },
  1461. })
  1462. }
  1463. // 34
  1464. if topic == masterConfig.TopicOfUngroundCloudpoints &&
  1465. (len(masterConfig.RuleOfUngroundCloudpoints1) > 0 ||
  1466. len(masterConfig.RuleOfUngroundCloudpoints3) > 0) {
  1467. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1468. Node: commonConfig.RosNode,
  1469. Topic: topic,
  1470. Callback: func(data *sensor_msgs.PointCloud2) {
  1471. subscribersTimeMutexes[i].Lock()
  1472. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1473. subscribersMutexes[i].Lock()
  1474. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1475. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1476. faultLabel := ""
  1477. if len(masterConfig.RuleOfUngroundCloudpoints1) > 0 {
  1478. for _, f := range masterConfig.RuleOfUngroundCloudpoints1 {
  1479. faultLabel = f(data)
  1480. if faultLabel != "" {
  1481. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1482. subscribersTimes[i] = time.Now()
  1483. goto TriggerSuccess
  1484. }
  1485. }
  1486. }
  1487. if len(masterConfig.RuleOfUngroundCloudpoints3) > 0 {
  1488. for _, f := range masterConfig.RuleOfUngroundCloudpoints3 {
  1489. faultLabel = f(shareVars, data)
  1490. if faultLabel != "" {
  1491. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1492. subscribersTimes[i] = time.Now()
  1493. goto TriggerSuccess
  1494. }
  1495. }
  1496. }
  1497. TriggerSuccess:
  1498. subscribersMutexes[i].Unlock()
  1499. }
  1500. subscribersTimeMutexes[i].Unlock()
  1501. },
  1502. })
  1503. }
  1504. // 35
  1505. if topic == masterConfig.TopicOfCameraImage &&
  1506. (len(masterConfig.RuleOfCameraImage1) > 0 ||
  1507. len(masterConfig.RuleOfCameraImage3) > 0) {
  1508. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1509. Node: commonConfig.RosNode,
  1510. Topic: topic,
  1511. Callback: func(data *sensor_msgs.Image) {
  1512. subscribersTimeMutexes[i].Lock()
  1513. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1514. subscribersMutexes[i].Lock()
  1515. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1516. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1517. faultLabel := ""
  1518. if len(masterConfig.RuleOfCameraImage1) > 0 {
  1519. for _, f := range masterConfig.RuleOfCameraImage1 {
  1520. faultLabel = f(data)
  1521. if faultLabel != "" {
  1522. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1523. subscribersTimes[i] = time.Now()
  1524. goto TriggerSuccess
  1525. }
  1526. }
  1527. }
  1528. if len(masterConfig.RuleOfCameraImage3) > 0 {
  1529. for _, f := range masterConfig.RuleOfCameraImage3 {
  1530. faultLabel = f(shareVars, data)
  1531. if faultLabel != "" {
  1532. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1533. subscribersTimes[i] = time.Now()
  1534. goto TriggerSuccess
  1535. }
  1536. }
  1537. }
  1538. TriggerSuccess:
  1539. subscribersMutexes[i].Unlock()
  1540. }
  1541. subscribersTimeMutexes[i].Unlock()
  1542. },
  1543. })
  1544. }
  1545. // 36 有共享变量的订阅者必须被创建
  1546. if topic == masterConfig.TopicOfDataRead {
  1547. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1548. Node: commonConfig.RosNode,
  1549. Topic: topic,
  1550. Callback: func(data *pjisuv_msgs.Retrieval) {
  1551. subscribersTimeMutexes[i].Lock()
  1552. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1553. subscribersMutexes[i].Lock()
  1554. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1555. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1556. faultLabel := ""
  1557. if len(masterConfig.RuleOfDataRead1) > 0 {
  1558. for _, f := range masterConfig.RuleOfDataRead1 {
  1559. faultLabel = f(data)
  1560. if faultLabel != "" {
  1561. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1562. subscribersTimes[i] = time.Now()
  1563. goto TriggerSuccess
  1564. }
  1565. }
  1566. }
  1567. if len(masterConfig.RuleOfDataRead3) > 0 {
  1568. for _, f := range masterConfig.RuleOfDataRead3 {
  1569. faultLabel = f(shareVars, data)
  1570. if faultLabel != "" {
  1571. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1572. subscribersTimes[i] = time.Now()
  1573. goto TriggerSuccess
  1574. }
  1575. }
  1576. }
  1577. TriggerSuccess:
  1578. subscribersMutexes[i].Unlock()
  1579. }
  1580. subscribersTimeMutexes[i].Unlock()
  1581. // 更新共享变量
  1582. numCountDataReadOfDataRead++
  1583. if numCountDataReadOfDataRead == 10 {
  1584. egoSteeringRealOfDataRead = append(egoSteeringRealOfDataRead, data.ActStrWhAng)
  1585. egoThrottleRealOfDataRead = append(egoThrottleRealOfDataRead, data.AccPed2)
  1586. numCountDataReadOfDataRead = 0
  1587. }
  1588. shareVars.Store("NumCountDataReadOfDataRead", numCountDataReadOfDataRead)
  1589. shareVars.Store("EgoSteeringRealOfDataRead", egoSteeringRealOfDataRead)
  1590. shareVars.Store("EgoThrottleRealOfDataRead", egoThrottleRealOfDataRead)
  1591. shareVars.Store("ActStrWhAngOfDataRead", data.ActStrWhAng)
  1592. },
  1593. })
  1594. }
  1595. // 37
  1596. if topic == masterConfig.TopicOfPjiGps &&
  1597. (len(masterConfig.RuleOfPjiGps1) > 0 ||
  1598. len(masterConfig.RuleOfPjiGps3) > 0) {
  1599. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1600. Node: commonConfig.RosNode,
  1601. Topic: topic,
  1602. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  1603. subscribersTimeMutexes[i].Lock()
  1604. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1605. subscribersMutexes[i].Lock()
  1606. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1607. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1608. faultLabel := ""
  1609. if len(masterConfig.RuleOfPjiGps1) > 0 {
  1610. for _, f := range masterConfig.RuleOfPjiGps1 {
  1611. faultLabel = f(data)
  1612. if faultLabel != "" {
  1613. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1614. subscribersTimes[i] = time.Now()
  1615. goto TriggerSuccess
  1616. }
  1617. }
  1618. }
  1619. if len(masterConfig.RuleOfPjiGps3) > 0 {
  1620. for _, f := range masterConfig.RuleOfPjiGps3 {
  1621. faultLabel = f(shareVars, data)
  1622. if faultLabel != "" {
  1623. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1624. subscribersTimes[i] = time.Now()
  1625. goto TriggerSuccess
  1626. }
  1627. }
  1628. }
  1629. TriggerSuccess:
  1630. subscribersMutexes[i].Unlock()
  1631. }
  1632. subscribersTimeMutexes[i].Unlock()
  1633. },
  1634. })
  1635. }
  1636. // 39 有共享变量的订阅者必须被创建
  1637. if topic == masterConfig.TopicOfPjVehicleFdbPub {
  1638. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1639. Node: commonConfig.RosNode,
  1640. Topic: topic,
  1641. Callback: func(data *pjisuv_msgs.VehicleFdb) {
  1642. subscribersTimeMutexes[i].Lock()
  1643. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1644. subscribersMutexes[i].Lock()
  1645. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1646. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1647. faultLabel := ""
  1648. if len(masterConfig.RuleOfPjVehicleFdbPub1) > 0 {
  1649. for _, f := range masterConfig.RuleOfPjVehicleFdbPub1 {
  1650. faultLabel = f(data)
  1651. if faultLabel != "" {
  1652. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1653. subscribersTimes[i] = time.Now()
  1654. goto TriggerSuccess
  1655. }
  1656. }
  1657. }
  1658. if len(masterConfig.RuleOfPjVehicleFdbPub3) > 0 {
  1659. for _, f := range masterConfig.RuleOfPjVehicleFdbPub3 {
  1660. faultLabel = f(shareVars, data)
  1661. if faultLabel != "" {
  1662. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1663. subscribersTimes[i] = time.Now()
  1664. goto TriggerSuccess
  1665. }
  1666. }
  1667. }
  1668. TriggerSuccess:
  1669. subscribersMutexes[i].Unlock()
  1670. }
  1671. subscribersTimeMutexes[i].Unlock()
  1672. // 更新共享变量
  1673. shareVars.Store("AutomodeOfPjVehicleFdbPub", data.Automode)
  1674. },
  1675. })
  1676. }
  1677. // 40 有共享变量的订阅者必须被创建
  1678. if topic == masterConfig.TopicOfEndPointMessage {
  1679. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1680. Node: commonConfig.RosNode,
  1681. Topic: topic,
  1682. Callback: func(data *geometry_msgs.Point) {
  1683. subscribersTimeMutexes[i].Lock()
  1684. if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
  1685. subscribersMutexes[i].Lock()
  1686. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1687. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1688. faultLabel := ""
  1689. if len(masterConfig.RuleOfEndPointMessage1) > 0 {
  1690. for _, f := range masterConfig.RuleOfEndPointMessage1 {
  1691. faultLabel = f(data)
  1692. if faultLabel != "" {
  1693. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1694. subscribersTimes[i] = time.Now()
  1695. goto TriggerSuccess
  1696. }
  1697. }
  1698. }
  1699. if len(masterConfig.RuleOfEndPointMessage3) > 0 {
  1700. for _, f := range masterConfig.RuleOfEndPointMessage3 {
  1701. faultLabel = f(shareVars, data)
  1702. if faultLabel != "" {
  1703. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1704. subscribersTimes[i] = time.Now()
  1705. goto TriggerSuccess
  1706. }
  1707. }
  1708. }
  1709. TriggerSuccess:
  1710. subscribersMutexes[i].Unlock()
  1711. }
  1712. subscribersTimeMutexes[i].Unlock()
  1713. // 更新共享变量
  1714. shareVars.Store("EndPointX", data.X)
  1715. shareVars.Store("EndPointY", data.Y)
  1716. },
  1717. })
  1718. }
  1719. if err != nil {
  1720. c_log.GlobalLogger.Info("创建订阅者报错,可能由于节点未启动,再次尝试:", err)
  1721. time.Sleep(time.Duration(2) * time.Second)
  1722. continue
  1723. } else {
  1724. break
  1725. }
  1726. }
  1727. }
  1728. select {
  1729. case signal := <-service.ChannelKillWindowProducer:
  1730. if signal == 1 {
  1731. commonConfig.RosNode.Close()
  1732. service.AddKillTimes("3")
  1733. return
  1734. }
  1735. }
  1736. }
  1737. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  1738. saveTimeWindowMutex.Lock()
  1739. defer saveTimeWindowMutex.Unlock()
  1740. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1741. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) { // 如果是不在旧故障窗口内,添加一个新窗口
  1742. exceptBegin := util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime)
  1743. finalTimeWindowBegin := ""
  1744. if util.TimeCustom1LessEqualThanTimeCustom2(exceptBegin, latestTimeWindowEnd) { // 窗口最早时间不能早于上一个窗口结束时间
  1745. finalTimeWindowBegin = latestTimeWindowEnd
  1746. } else {
  1747. finalTimeWindowBegin = exceptBegin
  1748. }
  1749. latestTimeWindowEnd = util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  1750. newTimeWindow := commonEntity.TimeWindow{
  1751. FaultTime: faultHappenTime,
  1752. TimeWindowBegin: finalTimeWindowBegin,
  1753. TimeWindowEnd: latestTimeWindowEnd,
  1754. Length: util.CalculateDifferenceOfTimeCustom(finalTimeWindowBegin, latestTimeWindowEnd),
  1755. Labels: []string{faultLabel},
  1756. MasterTopics: masterTopics,
  1757. SlaveTopics: slaveTopics,
  1758. }
  1759. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1760. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1761. } else { // 如果在旧故障窗口内
  1762. commonEntity.TimeWindowProducerQueueMutex.RLock()
  1763. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  1764. // 更新故障窗口end时间
  1765. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime) // 窗口期望关闭时间是触发时间加上后置时间
  1766. expectLength := util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1767. if expectLength < commonConfig.PlatformConfig.TaskMaxTime {
  1768. latestTimeWindowEnd = expectEnd
  1769. lastTimeWindow.TimeWindowEnd = latestTimeWindowEnd
  1770. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, lastTimeWindow.TimeWindowEnd)
  1771. }
  1772. // 更新label
  1773. labels := lastTimeWindow.Labels
  1774. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1775. // 更新 topic
  1776. sourceMasterTopics := lastTimeWindow.MasterTopics
  1777. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1778. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1779. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1780. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1781. }
  1782. }
  1783. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1784. // 获取所有需要采集的topic
  1785. var faultCodeTopics []string
  1786. for _, code := range commonConfig.CloudConfig.Triggers {
  1787. if code.Label == faultLabel {
  1788. faultCodeTopics = code.Topics
  1789. }
  1790. }
  1791. // 根据不同节点采集的topic进行分配采集
  1792. for _, acceptTopic := range faultCodeTopics {
  1793. for _, host := range commonConfig.CloudConfig.Hosts {
  1794. for _, topic := range host.Topics {
  1795. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1796. masterTopics = append(masterTopics, acceptTopic)
  1797. }
  1798. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1799. slaveTopics = append(slaveTopics, acceptTopic)
  1800. }
  1801. }
  1802. }
  1803. }
  1804. return masterTopics, slaveTopics
  1805. }