produce_window.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/kinglong/common/config"
  4. "cicv-data-closedloop/aarch64/kinglong/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/kinglong/master/package/config"
  6. "cicv-data-closedloop/common/config/c_log"
  7. commonEntity "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/kinglong_msgs"
  10. "github.com/bluenviron/goroslib/v2"
  11. "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
  12. "math"
  13. "sync"
  14. "time"
  15. )
  16. var (
  17. extendParam commonEntity.KinglongParam
  18. // /cicv_location
  19. mutexOfCicvLocation sync.RWMutex
  20. // /tpperception
  21. mutexOfTpperception sync.RWMutex
  22. // /pj_control_pub
  23. mutexOfJinlongControlPub sync.RWMutex
  24. // /data_read
  25. mutexOfDataRead sync.RWMutex
  26. )
  27. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  28. func PrepareTimeWindowProducerQueue() {
  29. c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
  30. var err error
  31. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  32. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  33. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  34. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  35. for i, topic := range commonConfig.SubscribeTopics {
  36. // !!!扩展的定时任务监听,牛逼!!!
  37. if topic == masterConfig.TopicOfCicvExtend {
  38. for {
  39. time.Sleep(time.Duration(3500) * time.Millisecond)
  40. for _, f := range masterConfig.RuleOfCicvExtend {
  41. label := f(extendParam)
  42. if label != "" {
  43. saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
  44. break
  45. }
  46. }
  47. }
  48. }
  49. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  50. if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
  51. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  52. Node: commonConfig.RosNode,
  53. Topic: topic,
  54. Callback: func(data *kinglong_msgs.PerceptionLocalization) {
  55. // 更新共享变量
  56. mutexOfCicvLocation.RLock()
  57. {
  58. extendParam.VelocityYOfCicvLocation = data.VelocityX
  59. extendParam.VelocityYOfCicvLocation = data.VelocityY
  60. extendParam.YawOfCicvLocation = data.Yaw
  61. extendParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
  62. }
  63. mutexOfCicvLocation.RUnlock()
  64. subscribersTimeMutexes[i].Lock()
  65. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  66. subscribersMutexes[i].Lock()
  67. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  68. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  69. var faultLabel string
  70. for _, f := range masterConfig.RuleOfCicvLocation {
  71. faultLabel = f(data)
  72. if faultLabel != "" {
  73. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  74. break
  75. }
  76. }
  77. subscribersMutexes[i].Unlock()
  78. }
  79. subscribersTimeMutexes[i].Unlock()
  80. },
  81. })
  82. }
  83. if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
  84. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  85. Node: commonConfig.RosNode,
  86. Topic: topic,
  87. Callback: func(data *kinglong_msgs.PerceptionObjects) {
  88. // 更新共享变量
  89. mutexOfTpperception.RLock()
  90. {
  91. for _, obj := range data.Objs {
  92. if (obj.Type != 1 && obj.Type != 0) || obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
  93. continue
  94. }
  95. if _, ok := extendParam.ObjDicOfTpperception[obj.Id]; !ok {
  96. extendParam.ObjDicOfTpperception[obj.Id] = []float32{}
  97. }
  98. extendParam.ObjDicOfTpperception[obj.Id] = append(extendParam.ObjDicOfTpperception[obj.Id], obj.Y)
  99. }
  100. }
  101. mutexOfTpperception.RUnlock()
  102. subscribersTimeMutexes[i].Lock()
  103. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  104. subscribersMutexes[i].Lock()
  105. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  106. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  107. var faultLabel string
  108. for _, f := range masterConfig.RuleOfTpperception {
  109. faultLabel = f(data, extendParam)
  110. if faultLabel != "" {
  111. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  112. break
  113. }
  114. }
  115. subscribersMutexes[i].Unlock()
  116. }
  117. subscribersTimeMutexes[i].Unlock()
  118. },
  119. })
  120. }
  121. // 5
  122. if topic == masterConfig.TopicOfDataRead {
  123. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  124. Node: commonConfig.RosNode,
  125. Topic: topic,
  126. Callback: func(data *kinglong_msgs.Retrieval) {
  127. // 更新共享变量
  128. mutexOfDataRead.RLock()
  129. {
  130. extendParam.NumCountDataReadOfDataRead++
  131. if extendParam.NumCountDataReadOfDataRead == 10 {
  132. extendParam.EgoSteeringRealOfDataRead = append(extendParam.EgoSteeringRealOfDataRead, data.StrgAngleRealValue)
  133. extendParam.EgoThrottleRealOfDataRead = append(extendParam.EgoThrottleRealOfDataRead, data.VcuAccelPosValue)
  134. extendParam.EgoBrakeRealOfDataRead = append(extendParam.EgoBrakeRealOfDataRead, data.VcuBrkPelPosValue)
  135. extendParam.NumCountDataReadOfDataRead = 0
  136. }
  137. }
  138. mutexOfDataRead.RUnlock()
  139. subscribersTimeMutexes[i].Lock()
  140. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  141. subscribersMutexes[i].Lock()
  142. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  143. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  144. var faultLabel string
  145. for _, f := range masterConfig.RuleOfDataRead {
  146. faultLabel = f(data)
  147. if faultLabel != "" {
  148. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  149. subscribersTimes[i] = time.Now()
  150. break
  151. }
  152. }
  153. subscribersMutexes[i].Unlock()
  154. }
  155. subscribersTimeMutexes[i].Unlock()
  156. },
  157. })
  158. }
  159. // 6
  160. if topic == masterConfig.TopicOfJinlongControlPub && len(masterConfig.RuleOfJinlongControlPub) > 0 {
  161. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  162. Node: commonConfig.RosNode,
  163. Topic: topic,
  164. Callback: func(data *kinglong_msgs.JinlongControlCommand) {
  165. // 更新共享变量
  166. mutexOfJinlongControlPub.RLock()
  167. {
  168. extendParam.NumCountJinlongControlCommandOfPjControlPub++
  169. if extendParam.NumCountJinlongControlCommandOfPjControlPub == 10 {
  170. extendParam.EgoSteeringCmdOfJinlongControlPub = append(extendParam.EgoSteeringCmdOfJinlongControlPub, data.ASStrgAngleReq)
  171. extendParam.EgoThrottleCmdOfJinlongControlPub = append(extendParam.EgoThrottleCmdOfJinlongControlPub, data.ASAutoDAccelPosReq)
  172. extendParam.EgoBrakeCmdOfJinlongControlPub = append(extendParam.EgoBrakeCmdOfJinlongControlPub, data.ASAutoDBrkPelPosReq)
  173. extendParam.NumCountJinlongControlCommandOfPjControlPub = 0
  174. }
  175. }
  176. mutexOfJinlongControlPub.RUnlock()
  177. subscribersTimeMutexes[i].Lock()
  178. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  179. subscribersMutexes[i].Lock()
  180. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  181. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  182. var faultLabel string
  183. for _, f := range masterConfig.RuleOfJinlongControlPub {
  184. faultLabel = f(data)
  185. if faultLabel != "" {
  186. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  187. break
  188. }
  189. }
  190. subscribersMutexes[i].Unlock()
  191. }
  192. subscribersTimeMutexes[i].Unlock()
  193. },
  194. })
  195. }
  196. // 7
  197. if topic == masterConfig.TopicOfFailureLidar && len(masterConfig.RuleOfFailureLidar) > 0 {
  198. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  199. Node: commonConfig.RosNode,
  200. Topic: topic,
  201. Callback: func(data *std_msgs.Bool) {
  202. subscribersTimeMutexes[i].Lock()
  203. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  204. subscribersMutexes[i].Lock()
  205. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  206. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  207. var faultLabel string
  208. for _, f := range masterConfig.RuleOfFailureLidar {
  209. faultLabel = f(data)
  210. if faultLabel != "" {
  211. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  212. break
  213. }
  214. }
  215. subscribersMutexes[i].Unlock()
  216. }
  217. subscribersTimeMutexes[i].Unlock()
  218. },
  219. })
  220. }
  221. // 8
  222. if topic == masterConfig.TopicOfFailureRadar && len(masterConfig.RuleOfFailureRadar) > 0 {
  223. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  224. Node: commonConfig.RosNode,
  225. Topic: topic,
  226. Callback: func(data *std_msgs.Bool) {
  227. subscribersTimeMutexes[i].Lock()
  228. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  229. subscribersMutexes[i].Lock()
  230. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  231. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  232. var faultLabel string
  233. for _, f := range masterConfig.RuleOfFailureRadar {
  234. faultLabel = f(data)
  235. if faultLabel != "" {
  236. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  237. break
  238. }
  239. }
  240. subscribersMutexes[i].Unlock()
  241. }
  242. subscribersTimeMutexes[i].Unlock()
  243. },
  244. })
  245. }
  246. // 9
  247. if topic == masterConfig.TopicOfFailureCamera && len(masterConfig.RuleOfFailureLidar) > 0 {
  248. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  249. Node: commonConfig.RosNode,
  250. Topic: topic,
  251. Callback: func(data *std_msgs.Bool) {
  252. subscribersTimeMutexes[i].Lock()
  253. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  254. subscribersMutexes[i].Lock()
  255. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  256. lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
  257. var faultLabel string
  258. for _, f := range masterConfig.RuleOfFailureLidar {
  259. faultLabel = f(data)
  260. if faultLabel != "" {
  261. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  262. break
  263. }
  264. }
  265. subscribersMutexes[i].Unlock()
  266. }
  267. subscribersTimeMutexes[i].Unlock()
  268. },
  269. })
  270. }
  271. if err != nil {
  272. c_log.GlobalLogger.Info("创建订阅者报错:", err)
  273. //TODO 如何回传日志
  274. continue
  275. }
  276. }
  277. select {
  278. case signal := <-service.ChannelKillWindowProducer:
  279. if signal == 1 {
  280. commonConfig.RosNode.Close()
  281. service.AddKillTimes("3")
  282. return
  283. }
  284. }
  285. }
  286. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
  287. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  288. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  289. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  290. newTimeWindow := commonEntity.TimeWindow{
  291. FaultTime: faultHappenTime,
  292. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  293. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  294. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  295. Labels: []string{faultLabel},
  296. MasterTopics: masterTopics,
  297. SlaveTopics: slaveTopics,
  298. }
  299. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  300. commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  301. } else {
  302. // 2-2 如果在旧故障窗口内
  303. commonEntity.TimeWindowProducerQueueMutex.RLock()
  304. defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
  305. // 2-2-1 更新故障窗口end时间
  306. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  307. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  308. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  309. lastTimeWindow.TimeWindowEnd = maxEnd
  310. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  311. } else {
  312. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  313. lastTimeWindow.TimeWindowEnd = expectEnd
  314. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  315. }
  316. }
  317. // 2-2-2 更新label
  318. labels := lastTimeWindow.Labels
  319. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  320. // 2-2-3 更新 topic
  321. sourceMasterTopics := lastTimeWindow.MasterTopics
  322. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  323. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  324. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  325. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  326. }
  327. }
  328. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  329. // 获取所有需要采集的topic
  330. var faultCodeTopics []string
  331. for _, code := range commonConfig.CloudConfig.Triggers {
  332. if code.Label == faultLabel {
  333. faultCodeTopics = code.Topics
  334. }
  335. }
  336. // 根据不同节点采集的topic进行分配采集
  337. for _, acceptTopic := range faultCodeTopics {
  338. for _, host := range commonConfig.CloudConfig.Hosts {
  339. for _, topic := range host.Topics {
  340. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  341. masterTopics = append(masterTopics, acceptTopic)
  342. }
  343. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  344. slaveTopics = append(slaveTopics, acceptTopic)
  345. }
  346. }
  347. }
  348. }
  349. return masterTopics, slaveTopics
  350. }