produce_window.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pji/common/config"
  4. commonService "cicv-data-closedloop/aarch64/pji/common/service"
  5. masterConfig "cicv-data-closedloop/aarch64/pji/master/package/config"
  6. "cicv-data-closedloop/amd64/dispatch_server/package/util"
  7. "cicv-data-closedloop/common/config/c_log"
  8. "cicv-data-closedloop/common/entity"
  9. commonUtil "cicv-data-closedloop/common/util"
  10. "encoding/json"
  11. "github.com/bluenviron/goroslib/v2"
  12. "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
  13. "sync"
  14. "time"
  15. )
  16. // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
  17. func PrepareTimeWindowProducerQueue() {
  18. var err error
  19. subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
  20. subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
  21. subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  22. subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
  23. for i, topic := range commonConfig.SubscribeTopics {
  24. c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
  25. if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
  26. subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
  27. Node: commonConfig.RosNode,
  28. Topic: topic,
  29. Callback: func(data *std_msgs.UInt8) {
  30. subscribersTimeMutexes[i].Lock()
  31. if time.Since(subscribersTimes[i]).Seconds() > 1 {
  32. subscribersMutexes[i].Lock()
  33. faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
  34. lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
  35. var faultLabel string
  36. for _, f := range masterConfig.RuleOfObstacleDetection {
  37. faultLabel = f(data)
  38. if faultLabel != "" {
  39. if canCollect() {
  40. saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
  41. subscribersTimes[i] = time.Now()
  42. break
  43. }
  44. }
  45. }
  46. subscribersMutexes[i].Unlock()
  47. }
  48. subscribersTimeMutexes[i].Unlock()
  49. },
  50. })
  51. }
  52. if err != nil {
  53. c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
  54. //TODO 如何回传日志
  55. continue
  56. }
  57. }
  58. select {
  59. case signal := <-commonService.ChannelKillSubscriber:
  60. if signal == 1 {
  61. commonConfig.RosNode.Close()
  62. commonService.AddKillTimes("3")
  63. return
  64. }
  65. }
  66. }
  67. func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *entity.TimeWindow) {
  68. masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
  69. if lastTimeWindow == nil || commonUtil.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
  70. // 2-1 如果是不在旧故障窗口内,添加一个新窗口
  71. newTimeWindow := entity.TimeWindow{
  72. FaultTime: faultHappenTime,
  73. TimeWindowBegin: commonUtil.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
  74. TimeWindowEnd: commonUtil.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
  75. Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
  76. Labels: []string{faultLabel},
  77. MasterTopics: masterTopics,
  78. SlaveTopics: slaveTopics,
  79. }
  80. c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
  81. entity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
  82. } else {
  83. // 2-2 如果在旧故障窗口内
  84. entity.TimeWindowProducerQueueMutex.RLock()
  85. defer entity.TimeWindowProducerQueueMutex.RUnlock()
  86. // 2-2-1 更新故障窗口end时间
  87. maxEnd := commonUtil.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
  88. expectEnd := commonUtil.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
  89. if commonUtil.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
  90. lastTimeWindow.TimeWindowEnd = maxEnd
  91. lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
  92. } else {
  93. if commonUtil.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
  94. lastTimeWindow.TimeWindowEnd = expectEnd
  95. lastTimeWindow.Length = commonUtil.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
  96. }
  97. }
  98. // 2-2-2 更新label
  99. labels := lastTimeWindow.Labels
  100. lastTimeWindow.Labels = commonUtil.AppendIfNotExists(labels, faultLabel)
  101. // 2-2-3 更新 topic
  102. sourceMasterTopics := lastTimeWindow.MasterTopics
  103. lastTimeWindow.MasterTopics = commonUtil.MergeSlice(sourceMasterTopics, masterTopics)
  104. sourceSlaveTopics := lastTimeWindow.SlaveTopics
  105. lastTimeWindow.SlaveTopics = commonUtil.MergeSlice(sourceSlaveTopics, slaveTopics)
  106. c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
  107. }
  108. }
  109. func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
  110. // 获取所有需要采集的topic
  111. var faultCodeTopics []string
  112. for _, code := range commonConfig.CloudConfig.Triggers {
  113. if code.Label == faultLabel {
  114. faultCodeTopics = code.Topics
  115. }
  116. }
  117. return faultCodeTopics, nil
  118. }
  119. // 判断采集包数量是否超过限额
  120. func canCollect() bool {
  121. responseString, err := commonUtil.HttpPostJsonWithHeaders(
  122. commonConfig.CloudConfig.CollectLimit.Url,
  123. map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
  124. map[string]string{
  125. "snCode": commonConfig.LocalConfig.SecretKey,
  126. "collectLimitDay": util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
  127. "collectLimitWeek": util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
  128. "collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
  129. "collectLimitYear": util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
  130. },
  131. )
  132. if err != nil {
  133. c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
  134. return false
  135. }
  136. // 解析JSON字符串到Response结构体
  137. var resp entity.Response
  138. err = json.Unmarshal([]byte(responseString), &resp)
  139. if err != nil {
  140. c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
  141. return false
  142. }
  143. if resp.Code != 200 { // 不是200 代表不允许采集
  144. c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。")
  145. return false
  146. }
  147. return true
  148. }