produce_window.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/kinglong/common/config"
  4. "cicv-data-closedloop/aarch64/kinglong/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/kinglong/master/package/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/kinglong_msgs"
  10. "github.com/bluenviron/goroslib/v2"
  11. "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
  12. "math"
  13. "sync"
  14. "time"
  15. )
  16. var (
  17. extendParam commonEntity.KinglongParam
  18. // /cicv_location
  19. mutexOfCicvLocation sync.RWMutex
  20. // /tpperception
  21. mutexOfTpperception sync.RWMutex
  22. // /pj_control_pub
  23. mutexOfJinlongControlPub sync.RWMutex
  24. // /data_read
  25. mutexOfDataRead sync.RWMutex
  26. )
  27. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  28. func PrepareTimeWindowProducerQueue() {
  29. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  30. var err error
  31. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  32. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  33. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  34. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  35. for i, topic := range commonConfig.SubscribeTopics {
  36. // !!!扩展的定时任务监听,牛逼!!!
  37. if topic == masterConfig.TopicOfCicvExtend {
  38. for {
  39. time.Sleep(time.Duration(3500) * time.Millisecond)
  40. for _, f := range masterConfig.RuleOfCicvExtend {
  41. label := f(extendParam)
  42. if label != "" {
  43. saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
  44. break
  45. }
  46. }
  47. }
  48. }
  49. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  50. if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
  51. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  52. Node: commonConfig.RosNode,
  53. Topic: topic,
  54. Callback: func(data *kinglong_msgs.PerceptionLocalization) {
  55. // 更新共享变量
  56. mutexOfCicvLocation.RLock()
  57. {
  58. extendParam.VelocityYOfCicvLocation = data.VelocityX
  59. extendParam.VelocityYOfCicvLocation = data.VelocityY
  60. extendParam.YawOfCicvLocation = data.Yaw
  61. extendParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
  62. }
  63. mutexOfCicvLocation.RUnlock()
  64. subscribersTimeMutexes[i].Lock()
  65. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  66. subscribersMutexes[i].Lock()
  67. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  68. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  69. var faultLabel string
  70. for _, f := range masterConfig.RuleOfCicvLocation {
  71. faultLabel = f(data)
  72. if faultLabel != "" {
  73. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  74. break
  75. }
  76. }
  77. subscribersMutexes[i].Unlock()
  78. }
  79. subscribersTimeMutexes[i].Unlock()
  80. },
  81. })
  82. }
  83. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  84. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  85. Node: commonConfig.RosNode,
  86. Topic: topic,
  87. Callback: func(data *kinglong_msgs.PerceptionObjects) {
  88. // ------- 触发前更新共享变量
  89. mutexOfTpperception.RLock()
  90. {
  91. for _, obj := range data.Objs {
  92. if (obj.Type != 1 && obj.Type != 0) || obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  93. continue
  94. }
  95. if _, ok := extendParam.ObjDicOfTpperception[obj.Id]; !ok {
  96. extendParam.ObjDicOfTpperception[obj.Id] = []float32{}
  97. }
  98. extendParam.ObjDicOfTpperception[obj.Id] = append(extendParam.ObjDicOfTpperception[obj.Id], obj.Y)
  99. }
  100. }
  101. mutexOfTpperception.RUnlock()
  102. subscribersTimeMutexes[i].Lock()
  103. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  104. subscribersMutexes[i].Lock()
  105. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  106. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  107. var faultLabel string
  108. for _, f := range masterConfig.RuleOfTpperception {
  109. faultLabel = f(data, extendParam)
  110. if faultLabel != "" {
  111. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  112. break
  113. }
  114. }
  115. subscribersMutexes[i].Unlock()
  116. }
  117. subscribersTimeMutexes[i].Unlock()
  118. // -------- 触发后更新共享变量
  119. mutexOfTpperception.RLock()
  120. {
  121. for _, obj := range data.Objs {
  122. extendParam.ObjTypeDicOfTpperception[obj.Id] = obj.Type
  123. extendParam.ObjSpeedDicOfTpperception[obj.Id] = math.Pow(math.Pow(float64(obj.Vxabs), 2)+math.Pow(float64(obj.Vyabs), 2), 0.5)
  124. }
  125. }
  126. mutexOfTpperception.RUnlock()
  127. },
  128. })
  129. }
  130. // 5
  131. if topic == masterConfig.TopicOfDataRead {
  132. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  133. Node: commonConfig.RosNode,
  134. Topic: topic,
  135. Callback: func(data *kinglong_msgs.Retrieval) {
  136. // 更新共享变量
  137. mutexOfDataRead.RLock()
  138. {
  139. extendParam.NumCountDataReadOfDataRead++
  140. if extendParam.NumCountDataReadOfDataRead == 10 {
  141. extendParam.EgoSteeringRealOfDataRead = append(extendParam.EgoSteeringRealOfDataRead, data.StrgAngleRealValue)
  142. extendParam.EgoThrottleRealOfDataRead = append(extendParam.EgoThrottleRealOfDataRead, data.VcuAccelPosValue)
  143. extendParam.EgoBrakeRealOfDataRead = append(extendParam.EgoBrakeRealOfDataRead, data.VcuBrkPelPosValue)
  144. extendParam.NumCountDataReadOfDataRead = 0
  145. }
  146. }
  147. mutexOfDataRead.RUnlock()
  148. subscribersTimeMutexes[i].Lock()
  149. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  150. subscribersMutexes[i].Lock()
  151. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  152. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  153. var faultLabel string
  154. for _, f := range masterConfig.RuleOfDataRead {
  155. faultLabel = f(data)
  156. if faultLabel != "" {
  157. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  158. subscribersTimes[i] = time.Now()
  159. break
  160. }
  161. }
  162. subscribersMutexes[i].Unlock()
  163. }
  164. subscribersTimeMutexes[i].Unlock()
  165. },
  166. })
  167. }
  168. // 6
  169. if topic == masterConfig.TopicOfJinlongControlPub && len(masterConfig.RuleOfJinlongControlPub) > 0 {
  170. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  171. Node: commonConfig.RosNode,
  172. Topic: topic,
  173. Callback: func(data *kinglong_msgs.JinlongControlCommand) {
  174. // 更新共享变量
  175. mutexOfJinlongControlPub.RLock()
  176. {
  177. extendParam.NumCountJinlongControlCommandOfPjControlPub++
  178. if extendParam.NumCountJinlongControlCommandOfPjControlPub == 10 {
  179. extendParam.EgoSteeringCmdOfJinlongControlPub = append(extendParam.EgoSteeringCmdOfJinlongControlPub, data.ASStrgAngleReq)
  180. extendParam.EgoThrottleCmdOfJinlongControlPub = append(extendParam.EgoThrottleCmdOfJinlongControlPub, data.ASAutoDAccelPosReq)
  181. extendParam.EgoBrakeCmdOfJinlongControlPub = append(extendParam.EgoBrakeCmdOfJinlongControlPub, data.ASAutoDBrkPelPosReq)
  182. extendParam.NumCountJinlongControlCommandOfPjControlPub = 0
  183. }
  184. }
  185. mutexOfJinlongControlPub.RUnlock()
  186. subscribersTimeMutexes[i].Lock()
  187. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  188. subscribersMutexes[i].Lock()
  189. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  190. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  191. var faultLabel string
  192. for _, f := range masterConfig.RuleOfJinlongControlPub {
  193. faultLabel = f(data)
  194. if faultLabel != "" {
  195. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  196. break
  197. }
  198. }
  199. subscribersMutexes[i].Unlock()
  200. }
  201. subscribersTimeMutexes[i].Unlock()
  202. },
  203. })
  204. }
  205. // 7
  206. if topic == masterConfig.TopicOfFailureLidar && len(masterConfig.RuleOfFailureLidar) > 0 {
  207. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  208. Node: commonConfig.RosNode,
  209. Topic: topic,
  210. Callback: func(data *std_msgs.Bool) {
  211. subscribersTimeMutexes[i].Lock()
  212. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  213. subscribersMutexes[i].Lock()
  214. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  215. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  216. var faultLabel string
  217. for _, f := range masterConfig.RuleOfFailureLidar {
  218. faultLabel = f(data)
  219. if faultLabel != "" {
  220. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  221. break
  222. }
  223. }
  224. subscribersMutexes[i].Unlock()
  225. }
  226. subscribersTimeMutexes[i].Unlock()
  227. },
  228. })
  229. }
  230. // 8
  231. if topic == masterConfig.TopicOfFailureRadar && len(masterConfig.RuleOfFailureRadar) > 0 {
  232. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  233. Node: commonConfig.RosNode,
  234. Topic: topic,
  235. Callback: func(data *std_msgs.Bool) {
  236. subscribersTimeMutexes[i].Lock()
  237. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  238. subscribersMutexes[i].Lock()
  239. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  240. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  241. var faultLabel string
  242. for _, f := range masterConfig.RuleOfFailureRadar {
  243. faultLabel = f(data)
  244. if faultLabel != "" {
  245. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  246. break
  247. }
  248. }
  249. subscribersMutexes[i].Unlock()
  250. }
  251. subscribersTimeMutexes[i].Unlock()
  252. },
  253. })
  254. }
  255. // 9
  256. if topic == masterConfig.TopicOfFailureCamera && len(masterConfig.RuleOfFailureLidar) > 0 {
  257. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  258. Node: commonConfig.RosNode,
  259. Topic: topic,
  260. Callback: func(data *std_msgs.Bool) {
  261. subscribersTimeMutexes[i].Lock()
  262. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  263. subscribersMutexes[i].Lock()
  264. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  265. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  266. var faultLabel string
  267. for _, f := range masterConfig.RuleOfFailureLidar {
  268. faultLabel = f(data)
  269. if faultLabel != "" {
  270. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  271. break
  272. }
  273. }
  274. subscribersMutexes[i].Unlock()
  275. }
  276. subscribersTimeMutexes[i].Unlock()
  277. },
  278. })
  279. }
  280. if err != nil {
  281. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  282. //TODO 如何回传日志
  283. continue
  284. }
  285. }
  286. select {
  287. case signal := <-service.ChannelKillWindowProducer:
  288. if signal == 1 {
  289. commonConfig.RosNode.Close()
  290. service.AddKillTimes("3")
  291. return
  292. }
  293. }
  294. }
  295. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  296. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  297. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  298. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  299. newTimeWindow := commonEntity.TimeWindow{
  300. FaultTime: faultHappenTime,
  301. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  302. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  303. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  304. Labels: []string{faultLabel},
  305. MasterTopics: masterTopics,
  306. SlaveTopics: slaveTopics,
  307. }
  308. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  309. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  310. } else {
  311. // 2-2 如果在旧故障窗口内
  312. commonEntity.TimeWindowProducerQueueMutex.RLock()
  313. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  314. // 2-2-1 更新故障窗口end时间
  315. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  316. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  317. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  318. lastTimeWindow.TimeWindowEnd = maxEnd
  319. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  320. } else {
  321. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  322. lastTimeWindow.TimeWindowEnd = expectEnd
  323. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  324. }
  325. }
  326. // 2-2-2 更新label
  327. labels := lastTimeWindow.Labels
  328. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  329. // 2-2-3 更新 topic
  330. sourceMasterTopics := lastTimeWindow.MasterTopics
  331. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  332. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  333. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  334. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  335. }
  336. }
  337. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  338. // 获取所有需要采集的topic
  339. var faultCodeTopics []string
  340. for _, code := range commonConfig.CloudConfig.Triggers {
  341. if code.Label == faultLabel {
  342. faultCodeTopics = code.Topics
  343. }
  344. }
  345. // 根据不同节点采集的topic进行分配采集
  346. for _, acceptTopic := range faultCodeTopics {
  347. for _, host := range commonConfig.CloudConfig.Hosts {
  348. for _, topic := range host.Topics {
  349. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  350. masterTopics = append(masterTopics, acceptTopic)
  351. }
  352. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  353. slaveTopics = append(slaveTopics, acceptTopic)
  354. }
  355. }
  356. }
  357. }
  358. return masterTopics, slaveTopics
  359. }