produce_window.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. package svc
  2. import (
  3. "cicv-data-closedloop/common/config/c_log"
  4. commonConfig "cicv-data-closedloop/kinglong/common/cfg"
  5. "cicv-data-closedloop/kinglong/common/ent"
  6. "cicv-data-closedloop/kinglong/common/global"
  7. "cicv-data-closedloop/kinglong/common/log"
  8. "cicv-data-closedloop/kinglong/common/svc"
  9. "cicv-data-closedloop/kinglong/common/util"
  10. masterConfig "cicv-data-closedloop/kinglong/master/pkg/cfg"
  11. "cicv-data-closedloop/kinglong_msgs"
  12. "github.com/bluenviron/goroslib/v2"
  13. "os"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. subscriber0Mutex sync.Mutex
  19. subscriber1Mutex sync.Mutex
  20. subscriber2Mutex sync.Mutex
  21. subscriber3Mutex sync.Mutex
  22. subscriber4Mutex sync.Mutex
  23. m sync.RWMutex
  24. velocityX float64
  25. velocityY float64
  26. yaw float64
  27. )
  28. // TODO 参考pjisuv修改
  29. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  30. func PrepareTimeWindowProducerQueue() {
  31. log.GlobalLogger.Info("订阅者 goroutine,启动。")
  32. //创建订阅者0订阅主题 nodefault_info
  33. log.GlobalLogger.Info("创建订阅者0订阅话题 ", masterConfig.TopicOfNodeFaultInfo)
  34. subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  35. Node: commonConfig.RosNode,
  36. Topic: masterConfig.TopicOfNodeFaultInfo,
  37. Callback: func(data *kinglong_msgs.FaultInfo) {
  38. if len(masterConfig.RuleOfNodefaultInfo) == 0 {
  39. //log.GlobalLogger.Info("话题 nodefault_info没有触发器")
  40. return
  41. }
  42. global.Subscriber0TimeMutex.Lock()
  43. // 判断是否是连续故障码
  44. gap := time.Since(global.Subscriber0Time).Seconds()
  45. if gap < 2 {
  46. global.Subscriber0Time = time.Now()
  47. global.Subscriber0TimeMutex.Unlock()
  48. return
  49. } else {
  50. // 2 不是连续故障码
  51. global.Subscriber0Time = time.Now()
  52. global.Subscriber0TimeMutex.Unlock()
  53. subscriber0Mutex.Lock()
  54. {
  55. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  56. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  57. var faultLabel string
  58. for _, f := range masterConfig.RuleOfNodefaultInfo {
  59. faultLabel = f(data)
  60. if faultLabel != "" {
  61. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  62. break
  63. }
  64. }
  65. }
  66. subscriber0Mutex.Unlock()
  67. }
  68. }})
  69. if err != nil {
  70. log.GlobalLogger.Info("创建订阅者0发生故障:", err)
  71. os.Exit(-1)
  72. }
  73. // 创建订阅者1订阅主题 cicv_location
  74. log.GlobalLogger.Info("创建订阅者1订阅话题 ", masterConfig.TopicOfCicvLocation)
  75. subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  76. Node: commonConfig.RosNode,
  77. Topic: masterConfig.TopicOfCicvLocation,
  78. Callback: func(data *kinglong_msgs.PerceptionLocalization) {
  79. if len(masterConfig.RuleOfCicvLocation) == 0 {
  80. log.GlobalLogger.Info("话题 cicv_location 没有触发器")
  81. return
  82. }
  83. subscriber1Mutex.Lock()
  84. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  85. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  86. // 更新共享变量
  87. m.RLock()
  88. velocityX = data.VelocityX
  89. velocityY = data.VelocityY
  90. yaw = data.Yaw
  91. m.RUnlock()
  92. var faultLabel string
  93. for _, f := range masterConfig.RuleOfCicvLocation {
  94. faultLabel = f(data)
  95. if faultLabel != "" {
  96. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  97. break
  98. }
  99. }
  100. subscriber1Mutex.Unlock()
  101. },
  102. })
  103. if err != nil {
  104. log.GlobalLogger.Info("创建订阅者1发生故障:", err)
  105. os.Exit(-1)
  106. }
  107. // 创建订阅者2订阅主题 tpperception
  108. log.GlobalLogger.Info("创建订阅者2订阅话题 ", masterConfig.TopicOfTpperception)
  109. subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  110. Node: commonConfig.RosNode,
  111. Topic: masterConfig.TopicOfTpperception,
  112. Callback: func(data *kinglong_msgs.PerceptionObjects) {
  113. if len(masterConfig.RuleOfTpperception) == 0 {
  114. log.GlobalLogger.Info("话题 tpperception 没有触发器")
  115. return
  116. }
  117. global.Subscriber2TimeMutex.Lock()
  118. // 判断是否是连续故障码
  119. gap := time.Since(global.Subscriber2Time).Seconds()
  120. if gap > 1 {
  121. // 2 不是连续故障码
  122. subscriber2Mutex.Lock()
  123. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  124. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  125. var faultLabel string
  126. //log.GlobalLogger.Infof("TTC数据为:【velocityX】=%v,【velocityY】=%v", velocityX, velocityY)
  127. for _, f := range masterConfig.RuleOfTpperception {
  128. faultLabel = f(data, velocityX, velocityY, yaw)
  129. if faultLabel != "" {
  130. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  131. global.Subscriber2Time = time.Now()
  132. break
  133. }
  134. }
  135. subscriber2Mutex.Unlock()
  136. }
  137. global.Subscriber2TimeMutex.Unlock()
  138. }})
  139. if err != nil {
  140. c_log.GlobalLogger.Info("创建订阅者2发生故障:", err)
  141. os.Exit(-1)
  142. }
  143. // 创建订阅者3订阅主题 fault_info
  144. log.GlobalLogger.Info("创建订阅者3订阅话题 ", masterConfig.TopicOfFaultInfo)
  145. subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  146. Node: commonConfig.RosNode,
  147. Topic: masterConfig.TopicOfFaultInfo,
  148. Callback: func(data *kinglong_msgs.FaultVec) {
  149. if len(masterConfig.RuleOfFaultInfo) == 0 {
  150. log.GlobalLogger.Info("话题 fault_info 没有触发器")
  151. return
  152. }
  153. global.Subscriber3TimeMutex.Lock()
  154. // 判断是否是连续故障码
  155. gap := time.Since(global.Subscriber3Time).Seconds()
  156. if gap > 2 {
  157. // 2 不是连续故障码
  158. global.Subscriber3Time = time.Now()
  159. global.Subscriber3TimeMutex.Unlock()
  160. subscriber3Mutex.Lock()
  161. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  162. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  163. var faultLabel string
  164. for _, f := range masterConfig.RuleOfFaultInfo {
  165. faultLabel = f(data)
  166. if faultLabel != "" {
  167. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  168. break
  169. }
  170. }
  171. subscriber3Mutex.Unlock()
  172. }
  173. global.Subscriber3TimeMutex.Unlock()
  174. }})
  175. if err != nil {
  176. log.GlobalLogger.Info("创建订阅者3发生故障:", err)
  177. os.Exit(-1)
  178. }
  179. // 创建订阅者4订阅主题 data_read
  180. // TODO 高频率触发
  181. log.GlobalLogger.Info("创建订阅者4订阅话题 ", masterConfig.TopicOfDataRead)
  182. subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
  183. Node: commonConfig.RosNode,
  184. Topic: masterConfig.TopicOfDataRead,
  185. Callback: func(data *kinglong_msgs.Retrieval) {
  186. if len(masterConfig.RuleOfDataRead) == 0 {
  187. //log.GlobalLogger.Info("话题 data_read 没有触发器")
  188. return
  189. }
  190. subscriber4Mutex.Lock()
  191. faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
  192. lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
  193. var faultLabel string
  194. for _, f := range masterConfig.RuleOfDataRead {
  195. faultLabel = f(data)
  196. if faultLabel != "" {
  197. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  198. break
  199. }
  200. }
  201. subscriber4Mutex.Unlock()
  202. },
  203. })
  204. if err != nil {
  205. log.GlobalLogger.Info("创建订阅者3发生故障:", err)
  206. os.Exit(-1)
  207. }
  208. select {
  209. case signal := <-svc.ChannelKillWindowProducer:
  210. if signal == 1 {
  211. defer svc.AddKillTimes("3")
  212. subscriber0.Close()
  213. subscriber1.Close()
  214. subscriber2.Close()
  215. subscriber3.Close()
  216. subscriber4.Close()
  217. commonConfig.RosNode.Close()
  218. return
  219. }
  220. }
  221. }
  222. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *ent.TimeWindow) {
  223. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  224. if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  225. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  226. newTimeWindow := ent.TimeWindow{
  227. FaultTime: faultHappenTime,
  228. TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  229. TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  230. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  231. Labels: []string{faultLabel},
  232. MasterTopics: masterTopics,
  233. SlaveTopics: slaveTopics,
  234. }
  235. log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  236. util.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  237. } else {
  238. // 2-2 如果在旧故障窗口内
  239. global.TimeWindowProducerQueueMutex.RLock()
  240. defer global.TimeWindowProducerQueueMutex.RUnlock()
  241. // 2-2-1 更新故障窗口end时间
  242. maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  243. expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  244. if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  245. lastTimeWindow.TimeWindowEnd = maxEnd
  246. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  247. } else {
  248. if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  249. lastTimeWindow.TimeWindowEnd = expectEnd
  250. lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  251. }
  252. }
  253. // 2-2-2 更新label
  254. labels := lastTimeWindow.Labels
  255. lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
  256. // 2-2-3 更新 topic
  257. sourceMasterTopics := lastTimeWindow.MasterTopics
  258. lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
  259. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  260. lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
  261. log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  262. }
  263. }
  264. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  265. // 获取所有需要采集的topic
  266. var faultCodeTopics []string
  267. for _, code := range commonConfig.CloudConfig.Triggers {
  268. if code.Label == faultLabel {
  269. faultCodeTopics = code.Topics
  270. }
  271. }
  272. // 根据不同节点采集的topic进行分配采集
  273. for _, acceptTopic := range faultCodeTopics {
  274. for _, host := range commonConfig.CloudConfig.Hosts {
  275. for _, topic := range host.Topics {
  276. if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
  277. masterTopics = append(masterTopics, acceptTopic)
  278. }
  279. if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
  280. slaveTopics = append(slaveTopics, acceptTopic)
  281. }
  282. }
  283. }
  284. }
  285. return masterTopics, slaveTopics
  286. }