produce_window.go 73 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/aarch64/pjisuv/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/pjisuv_msgs"
  10. "cicv-data-closedloop/pjisuv_param"
  11. "github.com/bluenviron/goroslib/v2"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
  13. "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
  14. "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
  15. "github.com/bluenviron/goroslib/v2/pkg/msgs/tf2_msgs"
  16. "github.com/bluenviron/goroslib/v2/pkg/msgs/visualization_msgs"
  17. "math"
  18. "sync"
  19. "time"
  20. )
  21. // 所有共享变量
  22. var (
  23. lastGlobalTriggerTime = time.Now()
  24. thisGlobalTriggerTime = time.Now()
  25. pjisuvParam = pjisuv_param.PjisuvParam{
  26. ObjDicOfTpperception: make(map[uint32][]float32),
  27. ObjTypeDicOfTpperception: make(map[uint32]uint8),
  28. ObjSpeedDicOfTpperception: make(map[uint32]float64),
  29. } // /cicv_location
  30. mutexOfCicvLocation sync.RWMutex
  31. // /tpperception
  32. mutexOfTpperception sync.RWMutex
  33. // /pj_control_pub
  34. mutexOfPjControlPub sync.RWMutex
  35. // /data_read
  36. mutexOfDataRead sync.RWMutex
  37. // /pj_vehicle_fdb_pub
  38. mutexOfPjVehicleFdbPub sync.RWMutex
  39. // /pj_vehicle_fdb_pub
  40. mutexOfCicvAmrTrajectory sync.RWMutex
  41. )
  42. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  43. func PrepareTimeWindowProducerQueue() {
  44. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  45. var err error
  46. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  47. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  48. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  49. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  50. for i, topic := range commonConfig.SubscribeTopics {
  51. // 增加了可扩展性
  52. if topic == masterConfig.TopicOfCicvExtend {
  53. go func() {
  54. for {
  55. time.Sleep(time.Duration(3500) * time.Millisecond)
  56. for _, f := range masterConfig.RuleOfCicvExtend {
  57. label := f(pjisuvParam)
  58. if label != "" {
  59. saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
  60. lastGlobalTriggerTime = thisGlobalTriggerTime
  61. thisGlobalTriggerTime = time.Now()
  62. subscribersTimes[i] = thisGlobalTriggerTime
  63. break
  64. }
  65. }
  66. }
  67. }()
  68. }
  69. // 其他常规监听器
  70. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  71. // 1
  72. if topic == masterConfig.TopicOfAmrPose && (len(masterConfig.RuleOfAmrPose1) > 0 || len(masterConfig.RuleOfAmrPose2) > 0) {
  73. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  74. Node: commonConfig.RosNode,
  75. Topic: topic,
  76. Callback: func(data *visualization_msgs.MarkerArray) {
  77. subscribersTimeMutexes[i].Lock()
  78. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  79. subscribersMutexes[i].Lock()
  80. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  81. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  82. faultLabel := ""
  83. if len(masterConfig.RuleOfAmrPose1) > 0 {
  84. for _, f := range masterConfig.RuleOfAmrPose1 {
  85. faultLabel = f(data)
  86. if faultLabel != "" {
  87. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  88. lastGlobalTriggerTime = thisGlobalTriggerTime
  89. thisGlobalTriggerTime = time.Now()
  90. subscribersTimes[i] = thisGlobalTriggerTime
  91. goto TriggerSuccess
  92. }
  93. }
  94. }
  95. if len(masterConfig.RuleOfAmrPose2) > 0 {
  96. for _, f := range masterConfig.RuleOfAmrPose2 {
  97. faultLabel = f(data, &pjisuvParam)
  98. if faultLabel != "" {
  99. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  100. lastGlobalTriggerTime = thisGlobalTriggerTime
  101. thisGlobalTriggerTime = time.Now()
  102. subscribersTimes[i] = thisGlobalTriggerTime
  103. goto TriggerSuccess
  104. }
  105. }
  106. }
  107. TriggerSuccess:
  108. subscribersMutexes[i].Unlock()
  109. }
  110. subscribersTimeMutexes[i].Unlock()
  111. },
  112. })
  113. }
  114. // 2
  115. if topic == masterConfig.TopicOfBoundingBoxesFast && (len(masterConfig.RuleOfBoundingBoxesFast1) > 0 || len(masterConfig.RuleOfBoundingBoxesFast2) > 0) {
  116. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  117. Node: commonConfig.RosNode,
  118. Topic: topic,
  119. Callback: func(data *pjisuv_msgs.BoundingBoxArray) {
  120. subscribersTimeMutexes[i].Lock()
  121. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  122. subscribersMutexes[i].Lock()
  123. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  124. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  125. faultLabel := ""
  126. if len(masterConfig.RuleOfBoundingBoxesFast1) > 0 {
  127. for _, f := range masterConfig.RuleOfBoundingBoxesFast1 {
  128. faultLabel = f(data)
  129. if faultLabel != "" {
  130. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  131. lastGlobalTriggerTime = thisGlobalTriggerTime
  132. thisGlobalTriggerTime = time.Now()
  133. subscribersTimes[i] = thisGlobalTriggerTime
  134. goto TriggerSuccess
  135. }
  136. }
  137. }
  138. if len(masterConfig.RuleOfBoundingBoxesFast2) > 0 {
  139. for _, f := range masterConfig.RuleOfBoundingBoxesFast2 {
  140. faultLabel = f(data, &pjisuvParam)
  141. if faultLabel != "" {
  142. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  143. lastGlobalTriggerTime = thisGlobalTriggerTime
  144. thisGlobalTriggerTime = time.Now()
  145. subscribersTimes[i] = thisGlobalTriggerTime
  146. goto TriggerSuccess
  147. }
  148. }
  149. }
  150. TriggerSuccess:
  151. subscribersMutexes[i].Unlock()
  152. }
  153. subscribersTimeMutexes[i].Unlock()
  154. },
  155. })
  156. }
  157. // 3
  158. if topic == masterConfig.TopicOfCameraFault && (len(masterConfig.RuleOfCameraFault1) > 0 || len(masterConfig.RuleOfCameraFault2) > 0) {
  159. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  160. Node: commonConfig.RosNode,
  161. Topic: topic,
  162. Callback: func(data *pjisuv_msgs.FaultVec) {
  163. subscribersTimeMutexes[i].Lock()
  164. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  165. subscribersMutexes[i].Lock()
  166. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  167. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  168. faultLabel := ""
  169. if len(masterConfig.RuleOfCameraFault1) > 0 {
  170. for _, f := range masterConfig.RuleOfCameraFault1 {
  171. faultLabel = f(data)
  172. if faultLabel != "" {
  173. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  174. lastGlobalTriggerTime = thisGlobalTriggerTime
  175. thisGlobalTriggerTime = time.Now()
  176. subscribersTimes[i] = thisGlobalTriggerTime
  177. goto TriggerSuccess
  178. }
  179. }
  180. }
  181. if len(masterConfig.RuleOfCameraFault2) > 0 {
  182. for _, f := range masterConfig.RuleOfCameraFault2 {
  183. faultLabel = f(data, &pjisuvParam)
  184. if faultLabel != "" {
  185. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  186. lastGlobalTriggerTime = thisGlobalTriggerTime
  187. thisGlobalTriggerTime = time.Now()
  188. subscribersTimes[i] = thisGlobalTriggerTime
  189. goto TriggerSuccess
  190. }
  191. }
  192. }
  193. TriggerSuccess:
  194. subscribersMutexes[i].Unlock()
  195. }
  196. subscribersTimeMutexes[i].Unlock()
  197. },
  198. })
  199. }
  200. // 4
  201. if topic == masterConfig.TopicOfCanData && (len(masterConfig.RuleOfCanData1) > 0 || len(masterConfig.RuleOfCanData2) > 0) {
  202. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  203. Node: commonConfig.RosNode,
  204. Topic: topic,
  205. Callback: func(data *pjisuv_msgs.Frame) {
  206. subscribersTimeMutexes[i].Lock()
  207. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  208. subscribersMutexes[i].Lock()
  209. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  210. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  211. faultLabel := ""
  212. if len(masterConfig.RuleOfCanData1) > 0 {
  213. for _, f := range masterConfig.RuleOfCanData1 {
  214. faultLabel = f(data)
  215. if faultLabel != "" {
  216. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  217. lastGlobalTriggerTime = thisGlobalTriggerTime
  218. thisGlobalTriggerTime = time.Now()
  219. subscribersTimes[i] = thisGlobalTriggerTime
  220. goto TriggerSuccess
  221. }
  222. }
  223. }
  224. if len(masterConfig.RuleOfCanData2) > 0 {
  225. for _, f := range masterConfig.RuleOfCanData2 {
  226. faultLabel = f(data, &pjisuvParam)
  227. if faultLabel != "" {
  228. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  229. lastGlobalTriggerTime = thisGlobalTriggerTime
  230. thisGlobalTriggerTime = time.Now()
  231. subscribersTimes[i] = thisGlobalTriggerTime
  232. goto TriggerSuccess
  233. }
  234. }
  235. }
  236. TriggerSuccess:
  237. subscribersMutexes[i].Unlock()
  238. }
  239. subscribersTimeMutexes[i].Unlock()
  240. },
  241. })
  242. }
  243. // 5
  244. if topic == masterConfig.TopicOfCh128x1LslidarPointCloud && (len(masterConfig.RuleOfCh128x1LslidarPointCloud1) > 0 || len(masterConfig.RuleOfCh128x1LslidarPointCloud2) > 0) {
  245. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  246. Node: commonConfig.RosNode,
  247. Topic: topic,
  248. Callback: func(data *sensor_msgs.PointCloud2) {
  249. subscribersTimeMutexes[i].Lock()
  250. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  251. subscribersMutexes[i].Lock()
  252. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  253. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  254. faultLabel := ""
  255. if len(masterConfig.RuleOfCh128x1LslidarPointCloud1) > 0 {
  256. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud1 {
  257. faultLabel = f(data)
  258. if faultLabel != "" {
  259. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  260. lastGlobalTriggerTime = thisGlobalTriggerTime
  261. thisGlobalTriggerTime = time.Now()
  262. subscribersTimes[i] = thisGlobalTriggerTime
  263. goto TriggerSuccess
  264. }
  265. }
  266. }
  267. if len(masterConfig.RuleOfCh128x1LslidarPointCloud2) > 0 {
  268. for _, f := range masterConfig.RuleOfCh128x1LslidarPointCloud2 {
  269. faultLabel = f(data, &pjisuvParam)
  270. if faultLabel != "" {
  271. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  272. lastGlobalTriggerTime = thisGlobalTriggerTime
  273. thisGlobalTriggerTime = time.Now()
  274. subscribersTimes[i] = thisGlobalTriggerTime
  275. goto TriggerSuccess
  276. }
  277. }
  278. }
  279. TriggerSuccess:
  280. subscribersMutexes[i].Unlock()
  281. }
  282. subscribersTimeMutexes[i].Unlock()
  283. },
  284. })
  285. }
  286. // 6
  287. if topic == masterConfig.TopicOfCh64wLLslidarPointCloud && (len(masterConfig.RuleOfCh64wLLslidarPointCloud1) > 0 || len(masterConfig.RuleOfCh64wLLslidarPointCloud2) > 1) {
  288. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  289. Node: commonConfig.RosNode,
  290. Topic: topic,
  291. Callback: func(data *sensor_msgs.PointCloud2) {
  292. subscribersTimeMutexes[i].Lock()
  293. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  294. subscribersMutexes[i].Lock()
  295. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  296. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  297. faultLabel := ""
  298. if len(masterConfig.RuleOfCh64wLLslidarPointCloud1) > 0 {
  299. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud1 {
  300. faultLabel = f(data)
  301. if faultLabel != "" {
  302. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  303. lastGlobalTriggerTime = thisGlobalTriggerTime
  304. thisGlobalTriggerTime = time.Now()
  305. subscribersTimes[i] = thisGlobalTriggerTime
  306. goto TriggerSuccess
  307. }
  308. }
  309. }
  310. if len(masterConfig.RuleOfCh64wLLslidarPointCloud2) > 0 {
  311. for _, f := range masterConfig.RuleOfCh64wLLslidarPointCloud2 {
  312. faultLabel = f(data, &pjisuvParam)
  313. if faultLabel != "" {
  314. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  315. lastGlobalTriggerTime = thisGlobalTriggerTime
  316. thisGlobalTriggerTime = time.Now()
  317. subscribersTimes[i] = thisGlobalTriggerTime
  318. goto TriggerSuccess
  319. }
  320. }
  321. }
  322. TriggerSuccess:
  323. subscribersMutexes[i].Unlock()
  324. }
  325. subscribersTimeMutexes[i].Unlock()
  326. },
  327. })
  328. }
  329. // 7
  330. if topic == masterConfig.TopicOfCh64wLScan && (len(masterConfig.RuleOfCh64wLScan1) > 0 || len(masterConfig.RuleOfCh64wLScan2) > 0) {
  331. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  332. Node: commonConfig.RosNode,
  333. Topic: topic,
  334. Callback: func(data *sensor_msgs.LaserScan) {
  335. subscribersTimeMutexes[i].Lock()
  336. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  337. subscribersMutexes[i].Lock()
  338. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  339. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  340. faultLabel := ""
  341. if len(masterConfig.RuleOfCh64wLScan1) > 0 {
  342. for _, f := range masterConfig.RuleOfCh64wLScan1 {
  343. faultLabel = f(data)
  344. if faultLabel != "" {
  345. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  346. lastGlobalTriggerTime = thisGlobalTriggerTime
  347. thisGlobalTriggerTime = time.Now()
  348. subscribersTimes[i] = thisGlobalTriggerTime
  349. goto TriggerSuccess
  350. }
  351. }
  352. }
  353. if len(masterConfig.RuleOfCh64wLScan2) > 0 {
  354. for _, f := range masterConfig.RuleOfCh64wLScan2 {
  355. faultLabel = f(data, &pjisuvParam)
  356. if faultLabel != "" {
  357. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  358. lastGlobalTriggerTime = thisGlobalTriggerTime
  359. thisGlobalTriggerTime = time.Now()
  360. subscribersTimes[i] = thisGlobalTriggerTime
  361. goto TriggerSuccess
  362. }
  363. }
  364. }
  365. TriggerSuccess:
  366. subscribersMutexes[i].Unlock()
  367. }
  368. subscribersTimeMutexes[i].Unlock()
  369. },
  370. })
  371. }
  372. // 8
  373. if topic == masterConfig.TopicOfCh64wRLslidarPointCloud && (len(masterConfig.RuleOfCh64wRLslidarPointCloud1) > 0 || len(masterConfig.RuleOfCh64wRLslidarPointCloud2) > 0) {
  374. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  375. Node: commonConfig.RosNode,
  376. Topic: topic,
  377. Callback: func(data *sensor_msgs.PointCloud2) {
  378. subscribersTimeMutexes[i].Lock()
  379. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  380. subscribersMutexes[i].Lock()
  381. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  382. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  383. faultLabel := ""
  384. if len(masterConfig.RuleOfCh64wRLslidarPointCloud1) > 0 {
  385. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud1 {
  386. faultLabel = f(data)
  387. if faultLabel != "" {
  388. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  389. lastGlobalTriggerTime = thisGlobalTriggerTime
  390. thisGlobalTriggerTime = time.Now()
  391. subscribersTimes[i] = thisGlobalTriggerTime
  392. goto TriggerSuccess
  393. }
  394. }
  395. }
  396. if len(masterConfig.RuleOfCh64wRLslidarPointCloud2) > 0 {
  397. for _, f := range masterConfig.RuleOfCh64wRLslidarPointCloud2 {
  398. faultLabel = f(data, &pjisuvParam)
  399. if faultLabel != "" {
  400. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  401. lastGlobalTriggerTime = thisGlobalTriggerTime
  402. thisGlobalTriggerTime = time.Now()
  403. subscribersTimes[i] = thisGlobalTriggerTime
  404. goto TriggerSuccess
  405. }
  406. }
  407. }
  408. TriggerSuccess:
  409. subscribersMutexes[i].Unlock()
  410. }
  411. subscribersTimeMutexes[i].Unlock()
  412. },
  413. })
  414. }
  415. // 9
  416. if topic == masterConfig.TopicOfCh64wRScan && (len(masterConfig.RuleOfCh64wRScan1) > 0 || len(masterConfig.RuleOfCh64wRScan2) > 0) {
  417. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  418. Node: commonConfig.RosNode,
  419. Topic: topic,
  420. Callback: func(data *sensor_msgs.LaserScan) {
  421. subscribersTimeMutexes[i].Lock()
  422. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  423. subscribersMutexes[i].Lock()
  424. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  425. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  426. faultLabel := ""
  427. if len(masterConfig.RuleOfCh64wRScan1) > 0 {
  428. for _, f := range masterConfig.RuleOfCh64wRScan1 {
  429. faultLabel = f(data)
  430. if faultLabel != "" {
  431. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  432. lastGlobalTriggerTime = thisGlobalTriggerTime
  433. thisGlobalTriggerTime = time.Now()
  434. subscribersTimes[i] = thisGlobalTriggerTime
  435. goto TriggerSuccess
  436. }
  437. }
  438. }
  439. if len(masterConfig.RuleOfCh64wRScan2) > 0 {
  440. for _, f := range masterConfig.RuleOfCh64wRScan2 {
  441. faultLabel = f(data, &pjisuvParam)
  442. if faultLabel != "" {
  443. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  444. lastGlobalTriggerTime = thisGlobalTriggerTime
  445. thisGlobalTriggerTime = time.Now()
  446. subscribersTimes[i] = thisGlobalTriggerTime
  447. goto TriggerSuccess
  448. }
  449. }
  450. }
  451. TriggerSuccess:
  452. subscribersMutexes[i].Unlock()
  453. }
  454. subscribersTimeMutexes[i].Unlock()
  455. },
  456. })
  457. }
  458. // 10
  459. if topic == masterConfig.TopicOfCicvLidarclusterMovingObjects && (len(masterConfig.RuleOfCicvLidarclusterMovingObjects1) > 0 || len(masterConfig.RuleOfCicvLidarclusterMovingObjects2) > 0) {
  460. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  461. Node: commonConfig.RosNode,
  462. Topic: topic,
  463. Callback: func(data *pjisuv_msgs.PerceptionCicvMovingObjects) {
  464. subscribersTimeMutexes[i].Lock()
  465. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  466. subscribersMutexes[i].Lock()
  467. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  468. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  469. faultLabel := ""
  470. if len(masterConfig.RuleOfCicvLidarclusterMovingObjects1) > 0 {
  471. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects1 {
  472. faultLabel = f(data)
  473. if faultLabel != "" {
  474. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  475. lastGlobalTriggerTime = thisGlobalTriggerTime
  476. thisGlobalTriggerTime = time.Now()
  477. subscribersTimes[i] = thisGlobalTriggerTime
  478. goto TriggerSuccess
  479. }
  480. }
  481. }
  482. if len(masterConfig.RuleOfCicvLidarclusterMovingObjects2) > 0 {
  483. for _, f := range masterConfig.RuleOfCicvLidarclusterMovingObjects2 {
  484. faultLabel = f(data, &pjisuvParam)
  485. if faultLabel != "" {
  486. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  487. lastGlobalTriggerTime = thisGlobalTriggerTime
  488. thisGlobalTriggerTime = time.Now()
  489. subscribersTimes[i] = thisGlobalTriggerTime
  490. goto TriggerSuccess
  491. }
  492. }
  493. }
  494. TriggerSuccess:
  495. subscribersMutexes[i].Unlock()
  496. }
  497. subscribersTimeMutexes[i].Unlock()
  498. },
  499. })
  500. }
  501. // 11
  502. if topic == masterConfig.TopicOfCicvAmrTrajectory && (len(masterConfig.RuleOfCicvAmrTrajectory1) > 0 || len(masterConfig.RuleOfCicvAmrTrajectory2) > 0) {
  503. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  504. Node: commonConfig.RosNode,
  505. Topic: topic,
  506. Callback: func(data *pjisuv_msgs.Trajectory) {
  507. subscribersTimeMutexes[i].Lock()
  508. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  509. subscribersMutexes[i].Lock()
  510. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  511. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  512. faultLabel := ""
  513. if len(masterConfig.RuleOfCicvAmrTrajectory1) > 0 {
  514. for _, f := range masterConfig.RuleOfCicvAmrTrajectory1 {
  515. faultLabel = f(data)
  516. if faultLabel != "" {
  517. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  518. lastGlobalTriggerTime = thisGlobalTriggerTime
  519. thisGlobalTriggerTime = time.Now()
  520. subscribersTimes[i] = thisGlobalTriggerTime
  521. goto TriggerSuccess
  522. }
  523. }
  524. }
  525. if len(masterConfig.RuleOfCicvAmrTrajectory2) > 0 {
  526. for _, f := range masterConfig.RuleOfCicvAmrTrajectory2 {
  527. faultLabel = f(data, &pjisuvParam)
  528. if faultLabel != "" {
  529. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  530. lastGlobalTriggerTime = thisGlobalTriggerTime
  531. thisGlobalTriggerTime = time.Now()
  532. subscribersTimes[i] = thisGlobalTriggerTime
  533. goto TriggerSuccess
  534. }
  535. }
  536. }
  537. TriggerSuccess:
  538. subscribersMutexes[i].Unlock()
  539. }
  540. subscribersTimeMutexes[i].Unlock()
  541. // 触发后更新共享变量
  542. mutexOfCicvAmrTrajectory.RLock()
  543. {
  544. var currentCurvateres []float64
  545. for _, point := range data.Trajectoryinfo.Trajectorypoints {
  546. currentCurvateres = append(currentCurvateres, math.Abs(float64(point.Curvature)))
  547. }
  548. pjisuvParam.LastCurvaturesOfCicvAmrTrajectory = currentCurvateres
  549. }
  550. mutexOfCicvAmrTrajectory.RUnlock()
  551. },
  552. })
  553. }
  554. // 12
  555. if topic == masterConfig.TopicOfCicvLocation && (len(masterConfig.RuleOfCicvLocation1) > 0 || len(masterConfig.RuleOfCicvLocation2) > 0) {
  556. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  557. Node: commonConfig.RosNode,
  558. Topic: topic,
  559. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  560. // 更新共享变量
  561. mutexOfCicvLocation.RLock()
  562. {
  563. pjisuvParam.VelocityXOfCicvLocation = data.VelocityX
  564. pjisuvParam.VelocityYOfCicvLocation = data.VelocityY
  565. pjisuvParam.VelocityZOfCicvLocation = data.VelocityZ
  566. pjisuvParam.YawOfCicvLocation = data.Yaw
  567. pjisuvParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
  568. pjisuvParam.PositionXOfCicvLocation = data.PositionX
  569. pjisuvParam.PositionYOfCicvLocation = data.PositionY
  570. }
  571. mutexOfCicvLocation.RUnlock()
  572. subscribersTimeMutexes[i].Lock()
  573. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  574. subscribersMutexes[i].Lock()
  575. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  576. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  577. faultLabel := ""
  578. if len(masterConfig.RuleOfCicvLocation1) > 0 {
  579. for _, f := range masterConfig.RuleOfCicvLocation1 {
  580. faultLabel = f(data)
  581. if faultLabel != "" {
  582. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  583. lastGlobalTriggerTime = thisGlobalTriggerTime
  584. thisGlobalTriggerTime = time.Now()
  585. subscribersTimes[i] = thisGlobalTriggerTime
  586. goto TriggerSuccess
  587. }
  588. }
  589. }
  590. if len(masterConfig.RuleOfCicvLocation2) > 0 {
  591. for _, f := range masterConfig.RuleOfCicvLocation2 {
  592. faultLabel = f(data, &pjisuvParam)
  593. if faultLabel != "" {
  594. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  595. lastGlobalTriggerTime = thisGlobalTriggerTime
  596. thisGlobalTriggerTime = time.Now()
  597. subscribersTimes[i] = thisGlobalTriggerTime
  598. goto TriggerSuccess
  599. }
  600. }
  601. }
  602. TriggerSuccess:
  603. subscribersMutexes[i].Unlock()
  604. }
  605. subscribersTimeMutexes[i].Unlock()
  606. },
  607. })
  608. }
  609. // 13
  610. if topic == masterConfig.TopicOfCloudClusters && (len(masterConfig.RuleOfCloudClusters1) > 0 || len(masterConfig.RuleOfCloudClusters2) > 0) {
  611. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  612. Node: commonConfig.RosNode,
  613. Topic: topic,
  614. Callback: func(data *pjisuv_msgs.AutowareCloudClusterArray) {
  615. subscribersTimeMutexes[i].Lock()
  616. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  617. subscribersMutexes[i].Lock()
  618. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  619. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  620. faultLabel := ""
  621. if len(masterConfig.RuleOfCloudClusters1) > 0 {
  622. for _, f := range masterConfig.RuleOfCloudClusters1 {
  623. faultLabel = f(data)
  624. if faultLabel != "" {
  625. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  626. lastGlobalTriggerTime = thisGlobalTriggerTime
  627. thisGlobalTriggerTime = time.Now()
  628. subscribersTimes[i] = thisGlobalTriggerTime
  629. goto TriggerSuccess
  630. }
  631. }
  632. }
  633. if len(masterConfig.RuleOfCloudClusters2) > 0 {
  634. for _, f := range masterConfig.RuleOfCloudClusters2 {
  635. faultLabel = f(data, &pjisuvParam)
  636. if faultLabel != "" {
  637. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  638. lastGlobalTriggerTime = thisGlobalTriggerTime
  639. thisGlobalTriggerTime = time.Now()
  640. subscribersTimes[i] = thisGlobalTriggerTime
  641. goto TriggerSuccess
  642. }
  643. }
  644. }
  645. TriggerSuccess:
  646. subscribersMutexes[i].Unlock()
  647. }
  648. subscribersTimeMutexes[i].Unlock()
  649. },
  650. })
  651. }
  652. // 14
  653. if topic == masterConfig.TopicOfHeartbeatInfo && (len(masterConfig.RuleOfHeartbeatInfo1) > 0 || len(masterConfig.RuleOfHeartbeatInfo2) > 0) {
  654. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  655. Node: commonConfig.RosNode,
  656. Topic: topic,
  657. Callback: func(data *pjisuv_msgs.HeartBeatInfo) {
  658. subscribersTimeMutexes[i].Lock()
  659. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  660. subscribersMutexes[i].Lock()
  661. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  662. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  663. faultLabel := ""
  664. if len(masterConfig.RuleOfHeartbeatInfo1) > 0 {
  665. for _, f := range masterConfig.RuleOfHeartbeatInfo1 {
  666. faultLabel = f(data)
  667. if faultLabel != "" {
  668. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  669. lastGlobalTriggerTime = thisGlobalTriggerTime
  670. thisGlobalTriggerTime = time.Now()
  671. subscribersTimes[i] = thisGlobalTriggerTime
  672. goto TriggerSuccess
  673. }
  674. }
  675. }
  676. if len(masterConfig.RuleOfHeartbeatInfo2) > 0 {
  677. for _, f := range masterConfig.RuleOfHeartbeatInfo2 {
  678. faultLabel = f(data, &pjisuvParam)
  679. if faultLabel != "" {
  680. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  681. lastGlobalTriggerTime = thisGlobalTriggerTime
  682. thisGlobalTriggerTime = time.Now()
  683. subscribersTimes[i] = thisGlobalTriggerTime
  684. goto TriggerSuccess
  685. }
  686. }
  687. }
  688. TriggerSuccess:
  689. subscribersMutexes[i].Unlock()
  690. }
  691. subscribersTimeMutexes[i].Unlock()
  692. },
  693. })
  694. }
  695. // 15
  696. if topic == masterConfig.TopicOfLidarPretreatmentCost && (len(masterConfig.RuleOfLidarPretreatmentCost1) > 0 || len(masterConfig.RuleOfLidarPretreatmentCost2) > 0) {
  697. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  698. Node: commonConfig.RosNode,
  699. Topic: topic,
  700. Callback: func(data *geometry_msgs.Vector3Stamped) {
  701. subscribersTimeMutexes[i].Lock()
  702. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  703. subscribersMutexes[i].Lock()
  704. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  705. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  706. faultLabel := ""
  707. if len(masterConfig.RuleOfLidarPretreatmentCost1) > 0 {
  708. for _, f := range masterConfig.RuleOfLidarPretreatmentCost1 {
  709. faultLabel = f(data)
  710. if faultLabel != "" {
  711. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  712. lastGlobalTriggerTime = thisGlobalTriggerTime
  713. thisGlobalTriggerTime = time.Now()
  714. subscribersTimes[i] = thisGlobalTriggerTime
  715. goto TriggerSuccess
  716. }
  717. }
  718. }
  719. if len(masterConfig.RuleOfLidarPretreatmentCost2) > 0 {
  720. for _, f := range masterConfig.RuleOfLidarPretreatmentCost2 {
  721. faultLabel = f(data, &pjisuvParam)
  722. if faultLabel != "" {
  723. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  724. lastGlobalTriggerTime = thisGlobalTriggerTime
  725. thisGlobalTriggerTime = time.Now()
  726. subscribersTimes[i] = thisGlobalTriggerTime
  727. goto TriggerSuccess
  728. }
  729. }
  730. }
  731. TriggerSuccess:
  732. subscribersMutexes[i].Unlock()
  733. }
  734. subscribersTimeMutexes[i].Unlock()
  735. },
  736. })
  737. }
  738. // 16
  739. if topic == masterConfig.TopicOfLidarPretreatmentOdometry && (len(masterConfig.RuleOfLidarPretreatmentOdometry1) > 0 || len(masterConfig.RuleOfLidarPretreatmentOdometry2) > 0) {
  740. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  741. Node: commonConfig.RosNode,
  742. Topic: topic,
  743. Callback: func(data *nav_msgs.Odometry) {
  744. subscribersTimeMutexes[i].Lock()
  745. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  746. subscribersMutexes[i].Lock()
  747. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  748. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  749. faultLabel := ""
  750. if len(masterConfig.RuleOfLidarPretreatmentOdometry1) > 0 {
  751. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry1 {
  752. faultLabel = f(data)
  753. if faultLabel != "" {
  754. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  755. lastGlobalTriggerTime = thisGlobalTriggerTime
  756. thisGlobalTriggerTime = time.Now()
  757. subscribersTimes[i] = thisGlobalTriggerTime
  758. goto TriggerSuccess
  759. }
  760. }
  761. }
  762. if len(masterConfig.RuleOfLidarPretreatmentOdometry2) > 0 {
  763. for _, f := range masterConfig.RuleOfLidarPretreatmentOdometry2 {
  764. faultLabel = f(data, &pjisuvParam)
  765. if faultLabel != "" {
  766. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  767. lastGlobalTriggerTime = thisGlobalTriggerTime
  768. thisGlobalTriggerTime = time.Now()
  769. subscribersTimes[i] = thisGlobalTriggerTime
  770. goto TriggerSuccess
  771. }
  772. }
  773. }
  774. TriggerSuccess:
  775. subscribersMutexes[i].Unlock()
  776. }
  777. subscribersTimeMutexes[i].Unlock()
  778. },
  779. })
  780. }
  781. // 17
  782. if topic == masterConfig.TopicOfLidarRoi && (len(masterConfig.RuleOfLidarRoi1) > 0 || len(masterConfig.RuleOfLidarRoi2) > 0) {
  783. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  784. Node: commonConfig.RosNode,
  785. Topic: topic,
  786. Callback: func(data *geometry_msgs.PolygonStamped) {
  787. subscribersTimeMutexes[i].Lock()
  788. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  789. subscribersMutexes[i].Lock()
  790. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  791. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  792. faultLabel := ""
  793. if len(masterConfig.RuleOfLidarRoi1) > 0 {
  794. for _, f := range masterConfig.RuleOfLidarRoi1 {
  795. faultLabel = f(data)
  796. if faultLabel != "" {
  797. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  798. lastGlobalTriggerTime = thisGlobalTriggerTime
  799. thisGlobalTriggerTime = time.Now()
  800. subscribersTimes[i] = thisGlobalTriggerTime
  801. goto TriggerSuccess
  802. }
  803. }
  804. }
  805. if len(masterConfig.RuleOfLidarRoi2) > 0 {
  806. for _, f := range masterConfig.RuleOfLidarRoi2 {
  807. faultLabel = f(data, &pjisuvParam)
  808. if faultLabel != "" {
  809. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  810. lastGlobalTriggerTime = thisGlobalTriggerTime
  811. thisGlobalTriggerTime = time.Now()
  812. subscribersTimes[i] = thisGlobalTriggerTime
  813. goto TriggerSuccess
  814. }
  815. }
  816. }
  817. TriggerSuccess:
  818. subscribersMutexes[i].Unlock()
  819. }
  820. subscribersTimeMutexes[i].Unlock()
  821. },
  822. })
  823. }
  824. // 18
  825. if topic == masterConfig.TopicOfLine1 && (len(masterConfig.RuleOfLine11) > 0 || len(masterConfig.RuleOfLine12) > 0) {
  826. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  827. Node: commonConfig.RosNode,
  828. Topic: topic,
  829. Callback: func(data *nav_msgs.Path) {
  830. subscribersTimeMutexes[i].Lock()
  831. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  832. subscribersMutexes[i].Lock()
  833. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  834. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  835. faultLabel := ""
  836. if len(masterConfig.RuleOfLine11) > 0 {
  837. for _, f := range masterConfig.RuleOfLine11 {
  838. faultLabel = f(data)
  839. if faultLabel != "" {
  840. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  841. lastGlobalTriggerTime = thisGlobalTriggerTime
  842. thisGlobalTriggerTime = time.Now()
  843. subscribersTimes[i] = thisGlobalTriggerTime
  844. goto TriggerSuccess
  845. }
  846. }
  847. }
  848. if len(masterConfig.RuleOfLine12) > 0 {
  849. for _, f := range masterConfig.RuleOfLine12 {
  850. faultLabel = f(data, &pjisuvParam)
  851. if faultLabel != "" {
  852. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  853. lastGlobalTriggerTime = thisGlobalTriggerTime
  854. thisGlobalTriggerTime = time.Now()
  855. subscribersTimes[i] = thisGlobalTriggerTime
  856. goto TriggerSuccess
  857. }
  858. }
  859. }
  860. TriggerSuccess:
  861. subscribersMutexes[i].Unlock()
  862. }
  863. subscribersTimeMutexes[i].Unlock()
  864. },
  865. })
  866. }
  867. // 19
  868. if topic == masterConfig.TopicOfLine2 && (len(masterConfig.RuleOfLine21) > 0 || len(masterConfig.RuleOfLine22) > 0) {
  869. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  870. Node: commonConfig.RosNode,
  871. Topic: topic,
  872. Callback: func(data *nav_msgs.Path) {
  873. subscribersTimeMutexes[i].Lock()
  874. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  875. subscribersMutexes[i].Lock()
  876. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  877. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  878. faultLabel := ""
  879. if len(masterConfig.RuleOfLine21) > 0 {
  880. for _, f := range masterConfig.RuleOfLine21 {
  881. faultLabel = f(data)
  882. if faultLabel != "" {
  883. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  884. lastGlobalTriggerTime = thisGlobalTriggerTime
  885. thisGlobalTriggerTime = time.Now()
  886. subscribersTimes[i] = thisGlobalTriggerTime
  887. goto TriggerSuccess
  888. }
  889. }
  890. }
  891. if len(masterConfig.RuleOfLine22) > 0 {
  892. for _, f := range masterConfig.RuleOfLine22 {
  893. faultLabel = f(data, &pjisuvParam)
  894. if faultLabel != "" {
  895. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  896. lastGlobalTriggerTime = thisGlobalTriggerTime
  897. thisGlobalTriggerTime = time.Now()
  898. subscribersTimes[i] = thisGlobalTriggerTime
  899. goto TriggerSuccess
  900. }
  901. }
  902. }
  903. TriggerSuccess:
  904. subscribersMutexes[i].Unlock()
  905. }
  906. subscribersTimeMutexes[i].Unlock()
  907. },
  908. })
  909. }
  910. // 20
  911. if topic == masterConfig.TopicOfMapPolygon && (len(masterConfig.RuleOfMapPolygon1) > 0 || len(masterConfig.RuleOfMapPolygon2) > 0) {
  912. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  913. Node: commonConfig.RosNode,
  914. Topic: topic,
  915. Callback: func(data *pjisuv_msgs.PolygonStamped) {
  916. subscribersTimeMutexes[i].Lock()
  917. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  918. subscribersMutexes[i].Lock()
  919. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  920. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  921. faultLabel := ""
  922. if len(masterConfig.RuleOfMapPolygon1) > 0 {
  923. for _, f := range masterConfig.RuleOfMapPolygon1 {
  924. faultLabel = f(data)
  925. if faultLabel != "" {
  926. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  927. lastGlobalTriggerTime = thisGlobalTriggerTime
  928. thisGlobalTriggerTime = time.Now()
  929. subscribersTimes[i] = thisGlobalTriggerTime
  930. goto TriggerSuccess
  931. }
  932. }
  933. }
  934. if len(masterConfig.RuleOfMapPolygon2) > 0 {
  935. for _, f := range masterConfig.RuleOfMapPolygon2 {
  936. faultLabel = f(data, &pjisuvParam)
  937. if faultLabel != "" {
  938. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  939. lastGlobalTriggerTime = thisGlobalTriggerTime
  940. thisGlobalTriggerTime = time.Now()
  941. subscribersTimes[i] = thisGlobalTriggerTime
  942. goto TriggerSuccess
  943. }
  944. }
  945. }
  946. TriggerSuccess:
  947. subscribersMutexes[i].Unlock()
  948. }
  949. subscribersTimeMutexes[i].Unlock()
  950. },
  951. })
  952. }
  953. // 21
  954. if topic == masterConfig.TopicOfObstacleDisplay && (len(masterConfig.RuleOfObstacleDisplay1) > 0 || len(masterConfig.RuleOfObstacleDisplay2) > 0) {
  955. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  956. Node: commonConfig.RosNode,
  957. Topic: topic,
  958. Callback: func(data *visualization_msgs.MarkerArray) {
  959. subscribersTimeMutexes[i].Lock()
  960. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  961. subscribersMutexes[i].Lock()
  962. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  963. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  964. faultLabel := ""
  965. if len(masterConfig.RuleOfObstacleDisplay1) > 0 {
  966. for _, f := range masterConfig.RuleOfObstacleDisplay1 {
  967. faultLabel = f(data)
  968. if faultLabel != "" {
  969. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  970. lastGlobalTriggerTime = thisGlobalTriggerTime
  971. thisGlobalTriggerTime = time.Now()
  972. subscribersTimes[i] = thisGlobalTriggerTime
  973. goto TriggerSuccess
  974. }
  975. }
  976. }
  977. if len(masterConfig.RuleOfObstacleDisplay2) > 0 {
  978. for _, f := range masterConfig.RuleOfObstacleDisplay2 {
  979. faultLabel = f(data, &pjisuvParam)
  980. if faultLabel != "" {
  981. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  982. lastGlobalTriggerTime = thisGlobalTriggerTime
  983. thisGlobalTriggerTime = time.Now()
  984. subscribersTimes[i] = thisGlobalTriggerTime
  985. goto TriggerSuccess
  986. }
  987. }
  988. }
  989. TriggerSuccess:
  990. subscribersMutexes[i].Unlock()
  991. }
  992. subscribersTimeMutexes[i].Unlock()
  993. },
  994. })
  995. }
  996. // 22
  997. if topic == masterConfig.TopicOfPjControlPub && (len(masterConfig.RuleOfPjControlPub1) > 0 || len(masterConfig.RuleOfPjControlPub2) > 0) {
  998. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  999. Node: commonConfig.RosNode,
  1000. Topic: topic,
  1001. Callback: func(data *pjisuv_msgs.CommonVehicleCmd) {
  1002. // 更新共享变量
  1003. mutexOfPjControlPub.RLock()
  1004. {
  1005. pjisuvParam.NumCountPjiControlCommandOfPjControlPub++
  1006. if pjisuvParam.NumCountPjiControlCommandOfPjControlPub == 10 {
  1007. pjisuvParam.EgoSteeringCmdOfPjControlPub = append(pjisuvParam.EgoSteeringCmdOfPjControlPub, data.ICPVCmdStrAngle)
  1008. pjisuvParam.EgoThrottleCmdOfPjControlPub = append(pjisuvParam.EgoThrottleCmdOfPjControlPub, data.ICPVCmdAccPelPosAct)
  1009. pjisuvParam.NumCountPjiControlCommandOfPjControlPub = 0
  1010. }
  1011. }
  1012. mutexOfPjControlPub.RUnlock()
  1013. subscribersTimeMutexes[i].Lock()
  1014. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1015. subscribersMutexes[i].Lock()
  1016. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1017. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1018. faultLabel := ""
  1019. if len(masterConfig.RuleOfPjControlPub1) > 0 {
  1020. for _, f := range masterConfig.RuleOfPjControlPub1 {
  1021. faultLabel = f(data)
  1022. if faultLabel != "" {
  1023. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1024. lastGlobalTriggerTime = thisGlobalTriggerTime
  1025. thisGlobalTriggerTime = time.Now()
  1026. subscribersTimes[i] = thisGlobalTriggerTime
  1027. goto TriggerSuccess
  1028. }
  1029. }
  1030. }
  1031. if len(masterConfig.RuleOfPjControlPub2) > 0 {
  1032. for _, f := range masterConfig.RuleOfPjControlPub2 {
  1033. faultLabel = f(data, &pjisuvParam)
  1034. if faultLabel != "" {
  1035. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1036. lastGlobalTriggerTime = thisGlobalTriggerTime
  1037. thisGlobalTriggerTime = time.Now()
  1038. subscribersTimes[i] = thisGlobalTriggerTime
  1039. goto TriggerSuccess
  1040. }
  1041. }
  1042. }
  1043. TriggerSuccess:
  1044. subscribersMutexes[i].Unlock()
  1045. }
  1046. subscribersTimeMutexes[i].Unlock()
  1047. },
  1048. })
  1049. }
  1050. // 23
  1051. if topic == masterConfig.TopicOfPointsCluster && (len(masterConfig.RuleOfPointsCluster1) > 0 || len(masterConfig.RuleOfPointsCluster2) > 0) {
  1052. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1053. Node: commonConfig.RosNode,
  1054. Topic: topic,
  1055. Callback: func(data *sensor_msgs.PointCloud2) {
  1056. subscribersTimeMutexes[i].Lock()
  1057. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1058. subscribersMutexes[i].Lock()
  1059. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1060. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1061. faultLabel := ""
  1062. if len(masterConfig.RuleOfPointsCluster1) > 0 {
  1063. for _, f := range masterConfig.RuleOfPointsCluster1 {
  1064. faultLabel = f(data)
  1065. if faultLabel != "" {
  1066. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1067. lastGlobalTriggerTime = thisGlobalTriggerTime
  1068. thisGlobalTriggerTime = time.Now()
  1069. subscribersTimes[i] = thisGlobalTriggerTime
  1070. goto TriggerSuccess
  1071. }
  1072. }
  1073. }
  1074. if len(masterConfig.RuleOfPointsCluster2) > 0 {
  1075. for _, f := range masterConfig.RuleOfPointsCluster2 {
  1076. faultLabel = f(data, &pjisuvParam)
  1077. if faultLabel != "" {
  1078. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1079. lastGlobalTriggerTime = thisGlobalTriggerTime
  1080. thisGlobalTriggerTime = time.Now()
  1081. subscribersTimes[i] = thisGlobalTriggerTime
  1082. goto TriggerSuccess
  1083. }
  1084. }
  1085. }
  1086. TriggerSuccess:
  1087. subscribersMutexes[i].Unlock()
  1088. }
  1089. subscribersTimeMutexes[i].Unlock()
  1090. },
  1091. })
  1092. }
  1093. // 24
  1094. if topic == masterConfig.TopicOfPointsConcat && (len(masterConfig.RuleOfPointsConcat1) > 0 || len(masterConfig.RuleOfPointsConcat2) > 0) {
  1095. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1096. Node: commonConfig.RosNode,
  1097. Topic: topic,
  1098. Callback: func(data *sensor_msgs.PointCloud2) {
  1099. subscribersTimeMutexes[i].Lock()
  1100. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1101. subscribersMutexes[i].Lock()
  1102. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1103. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1104. faultLabel := ""
  1105. if len(masterConfig.RuleOfPointsConcat1) > 0 {
  1106. for _, f := range masterConfig.RuleOfPointsConcat1 {
  1107. faultLabel = f(data)
  1108. if faultLabel != "" {
  1109. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1110. lastGlobalTriggerTime = thisGlobalTriggerTime
  1111. thisGlobalTriggerTime = time.Now()
  1112. subscribersTimes[i] = thisGlobalTriggerTime
  1113. goto TriggerSuccess
  1114. }
  1115. }
  1116. }
  1117. if len(masterConfig.RuleOfPointsConcat2) > 0 {
  1118. for _, f := range masterConfig.RuleOfPointsConcat2 {
  1119. faultLabel = f(data, &pjisuvParam)
  1120. if faultLabel != "" {
  1121. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1122. lastGlobalTriggerTime = thisGlobalTriggerTime
  1123. thisGlobalTriggerTime = time.Now()
  1124. subscribersTimes[i] = thisGlobalTriggerTime
  1125. goto TriggerSuccess
  1126. }
  1127. }
  1128. }
  1129. TriggerSuccess:
  1130. subscribersMutexes[i].Unlock()
  1131. }
  1132. subscribersTimeMutexes[i].Unlock()
  1133. },
  1134. })
  1135. }
  1136. // 25
  1137. if topic == masterConfig.TopicOfReferenceDisplay && (len(masterConfig.RuleOfReferenceDisplay1) > 0 || len(masterConfig.RuleOfReferenceDisplay2) > 0) {
  1138. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1139. Node: commonConfig.RosNode,
  1140. Topic: topic,
  1141. Callback: func(data *nav_msgs.Path) {
  1142. subscribersTimeMutexes[i].Lock()
  1143. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1144. subscribersMutexes[i].Lock()
  1145. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1146. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1147. faultLabel := ""
  1148. if len(masterConfig.RuleOfReferenceDisplay1) > 0 {
  1149. for _, f := range masterConfig.RuleOfReferenceDisplay1 {
  1150. faultLabel = f(data)
  1151. if faultLabel != "" {
  1152. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1153. lastGlobalTriggerTime = thisGlobalTriggerTime
  1154. thisGlobalTriggerTime = time.Now()
  1155. subscribersTimes[i] = thisGlobalTriggerTime
  1156. goto TriggerSuccess
  1157. }
  1158. }
  1159. }
  1160. if len(masterConfig.RuleOfReferenceDisplay2) > 0 {
  1161. for _, f := range masterConfig.RuleOfReferenceDisplay2 {
  1162. faultLabel = f(data, &pjisuvParam)
  1163. if faultLabel != "" {
  1164. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1165. lastGlobalTriggerTime = thisGlobalTriggerTime
  1166. thisGlobalTriggerTime = time.Now()
  1167. subscribersTimes[i] = thisGlobalTriggerTime
  1168. goto TriggerSuccess
  1169. }
  1170. }
  1171. }
  1172. TriggerSuccess:
  1173. subscribersMutexes[i].Unlock()
  1174. }
  1175. subscribersTimeMutexes[i].Unlock()
  1176. },
  1177. })
  1178. }
  1179. // 26
  1180. if topic == masterConfig.TopicOfReferenceTrajectory && (len(masterConfig.RuleOfReferenceTrajectory1) > 0 || len(masterConfig.RuleOfReferenceTrajectory2) > 0) {
  1181. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1182. Node: commonConfig.RosNode,
  1183. Topic: topic,
  1184. Callback: func(data *pjisuv_msgs.Trajectory) {
  1185. subscribersTimeMutexes[i].Lock()
  1186. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1187. subscribersMutexes[i].Lock()
  1188. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1189. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1190. faultLabel := ""
  1191. if len(masterConfig.RuleOfReferenceTrajectory1) > 0 {
  1192. for _, f := range masterConfig.RuleOfReferenceTrajectory1 {
  1193. faultLabel = f(data)
  1194. if faultLabel != "" {
  1195. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1196. lastGlobalTriggerTime = thisGlobalTriggerTime
  1197. thisGlobalTriggerTime = time.Now()
  1198. subscribersTimes[i] = thisGlobalTriggerTime
  1199. goto TriggerSuccess
  1200. }
  1201. }
  1202. }
  1203. if len(masterConfig.RuleOfReferenceTrajectory2) > 0 {
  1204. for _, f := range masterConfig.RuleOfReferenceTrajectory2 {
  1205. faultLabel = f(data, &pjisuvParam)
  1206. if faultLabel != "" {
  1207. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1208. lastGlobalTriggerTime = thisGlobalTriggerTime
  1209. thisGlobalTriggerTime = time.Now()
  1210. subscribersTimes[i] = thisGlobalTriggerTime
  1211. goto TriggerSuccess
  1212. }
  1213. }
  1214. }
  1215. TriggerSuccess:
  1216. subscribersMutexes[i].Unlock()
  1217. }
  1218. subscribersTimeMutexes[i].Unlock()
  1219. },
  1220. })
  1221. }
  1222. // 27
  1223. if topic == masterConfig.TopicOfRoiPoints && (len(masterConfig.RuleOfRoiPoints1) > 0 || len(masterConfig.RuleOfRoiPoints2) > 0) {
  1224. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1225. Node: commonConfig.RosNode,
  1226. Topic: topic,
  1227. Callback: func(data *sensor_msgs.PointCloud2) {
  1228. subscribersTimeMutexes[i].Lock()
  1229. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1230. subscribersMutexes[i].Lock()
  1231. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1232. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1233. faultLabel := ""
  1234. if len(masterConfig.RuleOfRoiPoints1) > 0 {
  1235. for _, f := range masterConfig.RuleOfRoiPoints1 {
  1236. faultLabel = f(data)
  1237. if faultLabel != "" {
  1238. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1239. lastGlobalTriggerTime = thisGlobalTriggerTime
  1240. thisGlobalTriggerTime = time.Now()
  1241. subscribersTimes[i] = thisGlobalTriggerTime
  1242. goto TriggerSuccess
  1243. }
  1244. }
  1245. }
  1246. if len(masterConfig.RuleOfRoiPoints2) > 0 {
  1247. for _, f := range masterConfig.RuleOfRoiPoints2 {
  1248. faultLabel = f(data, &pjisuvParam)
  1249. if faultLabel != "" {
  1250. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1251. lastGlobalTriggerTime = thisGlobalTriggerTime
  1252. thisGlobalTriggerTime = time.Now()
  1253. subscribersTimes[i] = thisGlobalTriggerTime
  1254. goto TriggerSuccess
  1255. }
  1256. }
  1257. }
  1258. TriggerSuccess:
  1259. subscribersMutexes[i].Unlock()
  1260. }
  1261. subscribersTimeMutexes[i].Unlock()
  1262. },
  1263. })
  1264. }
  1265. // 28
  1266. if topic == masterConfig.TopicOfRoiPolygon && (len(masterConfig.RuleOfRoiPolygon1) > 0 || len(masterConfig.RuleOfRoiPolygon2) > 0) {
  1267. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1268. Node: commonConfig.RosNode,
  1269. Topic: topic,
  1270. Callback: func(data *nav_msgs.Path) {
  1271. subscribersTimeMutexes[i].Lock()
  1272. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1273. subscribersMutexes[i].Lock()
  1274. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1275. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1276. faultLabel := ""
  1277. if len(masterConfig.RuleOfRoiPolygon1) > 0 {
  1278. for _, f := range masterConfig.RuleOfRoiPolygon1 {
  1279. faultLabel = f(data)
  1280. if faultLabel != "" {
  1281. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1282. lastGlobalTriggerTime = thisGlobalTriggerTime
  1283. thisGlobalTriggerTime = time.Now()
  1284. subscribersTimes[i] = thisGlobalTriggerTime
  1285. goto TriggerSuccess
  1286. }
  1287. }
  1288. }
  1289. if len(masterConfig.RuleOfRoiPolygon2) > 0 {
  1290. for _, f := range masterConfig.RuleOfRoiPolygon2 {
  1291. faultLabel = f(data, &pjisuvParam)
  1292. if faultLabel != "" {
  1293. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1294. lastGlobalTriggerTime = thisGlobalTriggerTime
  1295. thisGlobalTriggerTime = time.Now()
  1296. subscribersTimes[i] = thisGlobalTriggerTime
  1297. goto TriggerSuccess
  1298. }
  1299. }
  1300. }
  1301. TriggerSuccess:
  1302. subscribersMutexes[i].Unlock()
  1303. }
  1304. subscribersTimeMutexes[i].Unlock()
  1305. },
  1306. })
  1307. }
  1308. // 29
  1309. if topic == masterConfig.TopicOfTf && (len(masterConfig.RuleOfTf1) > 0 || len(masterConfig.RuleOfTf2) > 0) {
  1310. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1311. Node: commonConfig.RosNode,
  1312. Topic: topic,
  1313. Callback: func(data *tf2_msgs.TFMessage) {
  1314. subscribersTimeMutexes[i].Lock()
  1315. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1316. subscribersMutexes[i].Lock()
  1317. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1318. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1319. faultLabel := ""
  1320. if len(masterConfig.RuleOfTf1) > 0 {
  1321. for _, f := range masterConfig.RuleOfTf1 {
  1322. faultLabel = f(data)
  1323. if faultLabel != "" {
  1324. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1325. lastGlobalTriggerTime = thisGlobalTriggerTime
  1326. thisGlobalTriggerTime = time.Now()
  1327. subscribersTimes[i] = thisGlobalTriggerTime
  1328. goto TriggerSuccess
  1329. }
  1330. }
  1331. }
  1332. if len(masterConfig.RuleOfTf2) > 0 {
  1333. for _, f := range masterConfig.RuleOfTf2 {
  1334. faultLabel = f(data, &pjisuvParam)
  1335. if faultLabel != "" {
  1336. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1337. lastGlobalTriggerTime = thisGlobalTriggerTime
  1338. thisGlobalTriggerTime = time.Now()
  1339. subscribersTimes[i] = thisGlobalTriggerTime
  1340. goto TriggerSuccess
  1341. }
  1342. }
  1343. }
  1344. TriggerSuccess:
  1345. subscribersMutexes[i].Unlock()
  1346. }
  1347. subscribersTimeMutexes[i].Unlock()
  1348. },
  1349. })
  1350. }
  1351. // 30
  1352. if topic == masterConfig.TopicOfTpperception && (len(masterConfig.RuleOfTpperception1) > 0 || len(masterConfig.RuleOfTpperception2) > 0) {
  1353. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1354. Node: commonConfig.RosNode,
  1355. Topic: topic,
  1356. Callback: func(data *pjisuv_msgs.PerceptionObjects) {
  1357. // 更新共享变量
  1358. mutexOfTpperception.RLock()
  1359. {
  1360. for _, obj := range data.Objs {
  1361. if obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  1362. continue
  1363. }
  1364. // 检查 ObjDicOfTpperception 是否为 nil,如果是,则初始化它
  1365. if pjisuvParam.ObjDicOfTpperception == nil {
  1366. pjisuvParam.ObjDicOfTpperception = make(map[uint32][]float32)
  1367. }
  1368. if _, ok := pjisuvParam.ObjDicOfTpperception[obj.Id]; !ok {
  1369. pjisuvParam.ObjDicOfTpperception[obj.Id] = []float32{}
  1370. }
  1371. pjisuvParam.ObjDicOfTpperception[obj.Id] = append(pjisuvParam.ObjDicOfTpperception[obj.Id], obj.Y)
  1372. }
  1373. }
  1374. mutexOfTpperception.RUnlock()
  1375. subscribersTimeMutexes[i].Lock()
  1376. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1377. subscribersMutexes[i].Lock()
  1378. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1379. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1380. faultLabel := ""
  1381. if len(masterConfig.RuleOfTpperception1) > 0 {
  1382. for _, f := range masterConfig.RuleOfTpperception1 {
  1383. faultLabel = f(data)
  1384. if faultLabel != "" {
  1385. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1386. lastGlobalTriggerTime = thisGlobalTriggerTime
  1387. thisGlobalTriggerTime = time.Now()
  1388. subscribersTimes[i] = thisGlobalTriggerTime
  1389. goto TriggerSuccess
  1390. }
  1391. }
  1392. }
  1393. if len(masterConfig.RuleOfTpperception2) > 0 {
  1394. for _, f := range masterConfig.RuleOfTpperception2 {
  1395. faultLabel = f(data, &pjisuvParam)
  1396. if faultLabel != "" {
  1397. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1398. lastGlobalTriggerTime = thisGlobalTriggerTime
  1399. thisGlobalTriggerTime = time.Now()
  1400. subscribersTimes[i] = thisGlobalTriggerTime
  1401. goto TriggerSuccess
  1402. }
  1403. }
  1404. }
  1405. TriggerSuccess:
  1406. subscribersMutexes[i].Unlock()
  1407. }
  1408. subscribersTimeMutexes[i].Unlock()
  1409. // -------- 触发后更新共享变量
  1410. mutexOfTpperception.RLock()
  1411. {
  1412. for _, obj := range data.Objs {
  1413. pjisuvParam.ObjTypeDicOfTpperception[obj.Id] = obj.Type
  1414. pjisuvParam.ObjSpeedDicOfTpperception[obj.Id] = math.Pow(math.Pow(float64(obj.Vxabs), 2)+math.Pow(float64(obj.Vyabs), 2), 0.5)
  1415. }
  1416. }
  1417. mutexOfTpperception.RUnlock()
  1418. },
  1419. })
  1420. }
  1421. // 31
  1422. if topic == masterConfig.TopicOfTpperceptionVis && (len(masterConfig.RuleOfTpperceptionVis1) > 0 || len(masterConfig.RuleOfTpperceptionVis2) > 0) {
  1423. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1424. Node: commonConfig.RosNode,
  1425. Topic: topic,
  1426. Callback: func(data *visualization_msgs.MarkerArray) {
  1427. subscribersTimeMutexes[i].Lock()
  1428. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1429. subscribersMutexes[i].Lock()
  1430. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1431. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1432. faultLabel := ""
  1433. if len(masterConfig.RuleOfTpperceptionVis1) > 0 {
  1434. for _, f := range masterConfig.RuleOfTpperceptionVis1 {
  1435. faultLabel = f(data)
  1436. if faultLabel != "" {
  1437. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1438. lastGlobalTriggerTime = thisGlobalTriggerTime
  1439. thisGlobalTriggerTime = time.Now()
  1440. subscribersTimes[i] = thisGlobalTriggerTime
  1441. goto TriggerSuccess
  1442. }
  1443. }
  1444. }
  1445. if len(masterConfig.RuleOfTpperceptionVis2) > 0 {
  1446. for _, f := range masterConfig.RuleOfTpperceptionVis2 {
  1447. faultLabel = f(data, &pjisuvParam)
  1448. if faultLabel != "" {
  1449. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1450. lastGlobalTriggerTime = thisGlobalTriggerTime
  1451. thisGlobalTriggerTime = time.Now()
  1452. subscribersTimes[i] = thisGlobalTriggerTime
  1453. goto TriggerSuccess
  1454. }
  1455. }
  1456. }
  1457. TriggerSuccess:
  1458. subscribersMutexes[i].Unlock()
  1459. }
  1460. subscribersTimeMutexes[i].Unlock()
  1461. },
  1462. })
  1463. }
  1464. // 32
  1465. if topic == masterConfig.TopicOfTprouteplan && (len(masterConfig.RuleOfTprouteplan1) > 0 || len(masterConfig.RuleOfTprouteplan2) > 0) {
  1466. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1467. Node: commonConfig.RosNode,
  1468. Topic: topic,
  1469. Callback: func(data *pjisuv_msgs.RoutePlan) {
  1470. subscribersTimeMutexes[i].Lock()
  1471. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1472. subscribersMutexes[i].Lock()
  1473. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1474. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1475. faultLabel := ""
  1476. if len(masterConfig.RuleOfTprouteplan1) > 0 {
  1477. for _, f := range masterConfig.RuleOfTprouteplan1 {
  1478. faultLabel = f(data)
  1479. if faultLabel != "" {
  1480. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1481. lastGlobalTriggerTime = thisGlobalTriggerTime
  1482. thisGlobalTriggerTime = time.Now()
  1483. subscribersTimes[i] = thisGlobalTriggerTime
  1484. goto TriggerSuccess
  1485. }
  1486. }
  1487. }
  1488. if len(masterConfig.RuleOfTprouteplan2) > 0 {
  1489. for _, f := range masterConfig.RuleOfTprouteplan2 {
  1490. faultLabel = f(data, &pjisuvParam)
  1491. if faultLabel != "" {
  1492. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1493. lastGlobalTriggerTime = thisGlobalTriggerTime
  1494. thisGlobalTriggerTime = time.Now()
  1495. subscribersTimes[i] = thisGlobalTriggerTime
  1496. goto TriggerSuccess
  1497. }
  1498. }
  1499. }
  1500. TriggerSuccess:
  1501. subscribersMutexes[i].Unlock()
  1502. }
  1503. subscribersTimeMutexes[i].Unlock()
  1504. },
  1505. })
  1506. }
  1507. // 33
  1508. if topic == masterConfig.TopicOfTrajectoryDisplay && (len(masterConfig.RuleOfTrajectoryDisplay1) > 0 || len(masterConfig.RuleOfTrajectoryDisplay2) > 0) {
  1509. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1510. Node: commonConfig.RosNode,
  1511. Topic: topic,
  1512. Callback: func(data *nav_msgs.Path) {
  1513. subscribersTimeMutexes[i].Lock()
  1514. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1515. subscribersMutexes[i].Lock()
  1516. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1517. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1518. faultLabel := ""
  1519. if len(masterConfig.RuleOfTrajectoryDisplay1) > 0 {
  1520. for _, f := range masterConfig.RuleOfTrajectoryDisplay1 {
  1521. faultLabel = f(data)
  1522. if faultLabel != "" {
  1523. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1524. lastGlobalTriggerTime = thisGlobalTriggerTime
  1525. thisGlobalTriggerTime = time.Now()
  1526. subscribersTimes[i] = thisGlobalTriggerTime
  1527. goto TriggerSuccess
  1528. }
  1529. }
  1530. }
  1531. if len(masterConfig.RuleOfTrajectoryDisplay2) > 0 {
  1532. for _, f := range masterConfig.RuleOfTrajectoryDisplay2 {
  1533. faultLabel = f(data, &pjisuvParam)
  1534. if faultLabel != "" {
  1535. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1536. lastGlobalTriggerTime = thisGlobalTriggerTime
  1537. thisGlobalTriggerTime = time.Now()
  1538. subscribersTimes[i] = thisGlobalTriggerTime
  1539. goto TriggerSuccess
  1540. }
  1541. }
  1542. }
  1543. TriggerSuccess:
  1544. subscribersMutexes[i].Unlock()
  1545. }
  1546. subscribersTimeMutexes[i].Unlock()
  1547. },
  1548. })
  1549. }
  1550. // 34
  1551. if topic == masterConfig.TopicOfUngroundCloudpoints && (len(masterConfig.RuleOfUngroundCloudpoints1) > 0 || len(masterConfig.RuleOfUngroundCloudpoints2) > 0) {
  1552. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1553. Node: commonConfig.RosNode,
  1554. Topic: topic,
  1555. Callback: func(data *sensor_msgs.PointCloud2) {
  1556. subscribersTimeMutexes[i].Lock()
  1557. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1558. subscribersMutexes[i].Lock()
  1559. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1560. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1561. faultLabel := ""
  1562. if len(masterConfig.RuleOfUngroundCloudpoints1) > 0 {
  1563. for _, f := range masterConfig.RuleOfUngroundCloudpoints1 {
  1564. faultLabel = f(data)
  1565. if faultLabel != "" {
  1566. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1567. lastGlobalTriggerTime = thisGlobalTriggerTime
  1568. thisGlobalTriggerTime = time.Now()
  1569. subscribersTimes[i] = thisGlobalTriggerTime
  1570. goto TriggerSuccess
  1571. }
  1572. }
  1573. }
  1574. if len(masterConfig.RuleOfUngroundCloudpoints2) > 0 {
  1575. for _, f := range masterConfig.RuleOfUngroundCloudpoints2 {
  1576. faultLabel = f(data, &pjisuvParam)
  1577. if faultLabel != "" {
  1578. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1579. lastGlobalTriggerTime = thisGlobalTriggerTime
  1580. thisGlobalTriggerTime = time.Now()
  1581. subscribersTimes[i] = thisGlobalTriggerTime
  1582. goto TriggerSuccess
  1583. }
  1584. }
  1585. }
  1586. TriggerSuccess:
  1587. subscribersMutexes[i].Unlock()
  1588. }
  1589. subscribersTimeMutexes[i].Unlock()
  1590. },
  1591. })
  1592. }
  1593. // 35
  1594. if topic == masterConfig.TopicOfCameraImage && (len(masterConfig.RuleOfCameraImage1) > 0 || len(masterConfig.RuleOfCameraImage2) > 0) {
  1595. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1596. Node: commonConfig.RosNode,
  1597. Topic: topic,
  1598. Callback: func(data *sensor_msgs.Image) {
  1599. subscribersTimeMutexes[i].Lock()
  1600. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1601. subscribersMutexes[i].Lock()
  1602. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1603. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1604. faultLabel := ""
  1605. if len(masterConfig.RuleOfCameraImage1) > 0 {
  1606. for _, f := range masterConfig.RuleOfCameraImage1 {
  1607. faultLabel = f(data)
  1608. if faultLabel != "" {
  1609. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1610. lastGlobalTriggerTime = thisGlobalTriggerTime
  1611. thisGlobalTriggerTime = time.Now()
  1612. subscribersTimes[i] = thisGlobalTriggerTime
  1613. goto TriggerSuccess
  1614. }
  1615. }
  1616. }
  1617. if len(masterConfig.RuleOfCameraImage2) > 0 {
  1618. for _, f := range masterConfig.RuleOfCameraImage2 {
  1619. faultLabel = f(data, &pjisuvParam)
  1620. if faultLabel != "" {
  1621. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1622. lastGlobalTriggerTime = thisGlobalTriggerTime
  1623. thisGlobalTriggerTime = time.Now()
  1624. subscribersTimes[i] = thisGlobalTriggerTime
  1625. goto TriggerSuccess
  1626. }
  1627. }
  1628. }
  1629. TriggerSuccess:
  1630. subscribersMutexes[i].Unlock()
  1631. }
  1632. subscribersTimeMutexes[i].Unlock()
  1633. },
  1634. })
  1635. }
  1636. // 36
  1637. if topic == masterConfig.TopicOfDataRead && (len(masterConfig.RuleOfDataRead1) > 0 || len(masterConfig.RuleOfDataRead2) > 0) {
  1638. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1639. Node: commonConfig.RosNode,
  1640. Topic: topic,
  1641. Callback: func(data *pjisuv_msgs.Retrieval) {
  1642. // 更新共享变量
  1643. mutexOfDataRead.RLock()
  1644. {
  1645. pjisuvParam.NumCountDataReadOfDataRead++
  1646. if pjisuvParam.NumCountDataReadOfDataRead == 10 {
  1647. pjisuvParam.EgoSteeringRealOfDataRead = append(pjisuvParam.EgoSteeringRealOfDataRead, data.ActStrWhAng)
  1648. pjisuvParam.EgoThrottleRealOfDataRead = append(pjisuvParam.EgoThrottleRealOfDataRead, data.AccPed2)
  1649. pjisuvParam.NumCountDataReadOfDataRead = 0
  1650. }
  1651. pjisuvParam.StrgAngleRealValueOfDataRead = data.ActStrWhAng
  1652. }
  1653. mutexOfDataRead.RUnlock()
  1654. subscribersTimeMutexes[i].Lock()
  1655. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1656. subscribersMutexes[i].Lock()
  1657. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1658. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1659. faultLabel := ""
  1660. if len(masterConfig.RuleOfDataRead1) > 0 {
  1661. for _, f := range masterConfig.RuleOfDataRead1 {
  1662. faultLabel = f(data)
  1663. if faultLabel != "" {
  1664. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1665. lastGlobalTriggerTime = thisGlobalTriggerTime
  1666. thisGlobalTriggerTime = time.Now()
  1667. subscribersTimes[i] = thisGlobalTriggerTime
  1668. goto TriggerSuccess
  1669. }
  1670. }
  1671. }
  1672. if len(masterConfig.RuleOfDataRead2) > 0 {
  1673. for _, f := range masterConfig.RuleOfDataRead2 {
  1674. faultLabel = f(data, &pjisuvParam)
  1675. if faultLabel != "" {
  1676. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1677. lastGlobalTriggerTime = thisGlobalTriggerTime
  1678. thisGlobalTriggerTime = time.Now()
  1679. subscribersTimes[i] = thisGlobalTriggerTime
  1680. goto TriggerSuccess
  1681. }
  1682. }
  1683. }
  1684. TriggerSuccess:
  1685. subscribersMutexes[i].Unlock()
  1686. }
  1687. subscribersTimeMutexes[i].Unlock()
  1688. },
  1689. })
  1690. }
  1691. // 37
  1692. if topic == masterConfig.TopicOfPjiGps && (len(masterConfig.RuleOfPjiGps1) > 0 || len(masterConfig.RuleOfPjiGps2) > 0) {
  1693. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1694. Node: commonConfig.RosNode,
  1695. Topic: topic,
  1696. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  1697. subscribersTimeMutexes[i].Lock()
  1698. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1699. subscribersMutexes[i].Lock()
  1700. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1701. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1702. faultLabel := ""
  1703. if len(masterConfig.RuleOfPjiGps1) > 0 {
  1704. for _, f := range masterConfig.RuleOfPjiGps1 {
  1705. faultLabel = f(data)
  1706. if faultLabel != "" {
  1707. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1708. lastGlobalTriggerTime = thisGlobalTriggerTime
  1709. thisGlobalTriggerTime = time.Now()
  1710. subscribersTimes[i] = thisGlobalTriggerTime
  1711. goto TriggerSuccess
  1712. }
  1713. }
  1714. }
  1715. if len(masterConfig.RuleOfPjiGps2) > 0 {
  1716. for _, f := range masterConfig.RuleOfPjiGps2 {
  1717. faultLabel = f(data, &pjisuvParam)
  1718. if faultLabel != "" {
  1719. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1720. lastGlobalTriggerTime = thisGlobalTriggerTime
  1721. thisGlobalTriggerTime = time.Now()
  1722. subscribersTimes[i] = thisGlobalTriggerTime
  1723. goto TriggerSuccess
  1724. }
  1725. }
  1726. }
  1727. TriggerSuccess:
  1728. subscribersMutexes[i].Unlock()
  1729. }
  1730. subscribersTimeMutexes[i].Unlock()
  1731. },
  1732. })
  1733. }
  1734. // 39
  1735. if topic == masterConfig.TopicOfPjVehicleFdbPub && (len(masterConfig.RuleOfPjVehicleFdbPub1) > 0 || len(masterConfig.RuleOfPjVehicleFdbPub2) > 0) {
  1736. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  1737. Node: commonConfig.RosNode,
  1738. Topic: topic,
  1739. Callback: func(data *pjisuv_msgs.VehicleFdb) {
  1740. subscribersTimeMutexes[i].Lock()
  1741. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  1742. subscribersMutexes[i].Lock()
  1743. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  1744. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  1745. faultLabel := ""
  1746. if len(masterConfig.RuleOfPjVehicleFdbPub1) > 0 {
  1747. for _, f := range masterConfig.RuleOfPjVehicleFdbPub1 {
  1748. faultLabel = f(data)
  1749. if faultLabel != "" {
  1750. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1751. lastGlobalTriggerTime = thisGlobalTriggerTime
  1752. thisGlobalTriggerTime = time.Now()
  1753. subscribersTimes[i] = thisGlobalTriggerTime
  1754. goto TriggerSuccess
  1755. }
  1756. }
  1757. }
  1758. if len(masterConfig.RuleOfPjVehicleFdbPub2) > 0 {
  1759. for _, f := range masterConfig.RuleOfPjVehicleFdbPub2 {
  1760. faultLabel = f(data, &pjisuvParam)
  1761. if faultLabel != "" {
  1762. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  1763. lastGlobalTriggerTime = thisGlobalTriggerTime
  1764. thisGlobalTriggerTime = time.Now()
  1765. subscribersTimes[i] = thisGlobalTriggerTime
  1766. goto TriggerSuccess
  1767. }
  1768. }
  1769. }
  1770. TriggerSuccess:
  1771. subscribersMutexes[i].Unlock()
  1772. }
  1773. subscribersTimeMutexes[i].Unlock()
  1774. },
  1775. })
  1776. }
  1777. if err != nil {
  1778. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  1779. continue
  1780. }
  1781. }
  1782. select {
  1783. case signal := <-service.ChannelKillWindowProducer:
  1784. if signal == 1 {
  1785. commonConfig.RosNode.Close()
  1786. service.AddKillTimes("3")
  1787. return
  1788. }
  1789. }
  1790. }
  1791. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  1792. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  1793. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) { // 如果是不在旧故障窗口内,添加一个新窗口
  1794. exceptBegin := util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime)
  1795. earliestBegin := util.GetTimeString(lastGlobalTriggerTime) // 窗口最早时间不能早于上一次触发时间
  1796. finalTimeWindowBegin := ""
  1797. if util.TimeCustom1LessEqualThanTimeCustom2(exceptBegin, earliestBegin) {
  1798. finalTimeWindowBegin = earliestBegin
  1799. } else {
  1800. finalTimeWindowBegin = exceptBegin
  1801. }
  1802. newTimeWindow := commonEntity.TimeWindow{
  1803. FaultTime: faultHappenTime,
  1804. TimeWindowBegin: finalTimeWindowBegin,
  1805. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  1806. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  1807. Labels: []string{faultLabel},
  1808. MasterTopics: masterTopics,
  1809. SlaveTopics: slaveTopics,
  1810. }
  1811. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  1812. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  1813. } else { // 如果在旧故障窗口内
  1814. commonEntity.TimeWindowProducerQueueMutex.RLock()
  1815. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  1816. // 更新故障窗口end时间
  1817. latestEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime) // 窗口最晚关闭时间是窗口开启时间加上最大任务时间
  1818. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime) // 窗口期望关闭时间是触发时间加上后置时间
  1819. if util.TimeCustom1GreaterTimeCustom2(expectEnd, latestEnd) {
  1820. lastTimeWindow.TimeWindowEnd = latestEnd
  1821. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  1822. } else {
  1823. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  1824. lastTimeWindow.TimeWindowEnd = expectEnd
  1825. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  1826. }
  1827. }
  1828. // 更新label
  1829. labels := lastTimeWindow.Labels
  1830. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  1831. // 更新 topic
  1832. sourceMasterTopics := lastTimeWindow.MasterTopics
  1833. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  1834. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  1835. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  1836. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  1837. }
  1838. }
  1839. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  1840. // 获取所有需要采集的topic
  1841. var faultCodeTopics []string
  1842. for _, code := range commonConfig.CloudConfig.Triggers {
  1843. if code.Label == faultLabel {
  1844. faultCodeTopics = code.Topics
  1845. }
  1846. }
  1847. // 根据不同节点采集的topic进行分配采集
  1848. for _, acceptTopic := range faultCodeTopics {
  1849. for _, host := range commonConfig.CloudConfig.Hosts {
  1850. for _, topic := range host.Topics {
  1851. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  1852. masterTopics = append(masterTopics, acceptTopic)
  1853. }
  1854. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  1855. slaveTopics = append(slaveTopics, acceptTopic)
  1856. }
  1857. }
  1858. }
  1859. }
  1860. return masterTopics, slaveTopics
  1861. }