123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- package service
- import (
- commonConfig "cicv-data-closedloop/aarch64/pji/common/config"
- commonService "cicv-data-closedloop/aarch64/pji/common/service"
- masterConfig "cicv-data-closedloop/aarch64/pji/master/package/config"
- "cicv-data-closedloop/common/config/c_log"
- "cicv-data-closedloop/common/entity"
- "cicv-data-closedloop/common/util"
- commonUtil "cicv-data-closedloop/common/util"
- "encoding/json"
- "github.com/bluenviron/goroslib/v2"
- "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
- "sync"
- "time"
- )
- // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
- func PrepareTimeWindowProducerQueue() {
- var err error
- subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
- subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
- subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
- subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
- for i, topic := range commonConfig.SubscribeTopics {
- c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
- if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: commonConfig.RosNode,
- Topic: topic,
- Callback: func(data *std_msgs.UInt8) {
- subscribersTimeMutexes[i].Lock()
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
- subscribersMutexes[i].Lock()
- faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
- lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
- var faultLabel string
- for _, f := range masterConfig.RuleOfObstacleDetection {
- faultLabel = f(data)
- if faultLabel != "" {
- if canCollect() {
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- subscribersTimes[i] = time.Now()
- break
- }
- }
- }
- subscribersMutexes[i].Unlock()
- }
- subscribersTimeMutexes[i].Unlock()
- },
- })
- }
- if err != nil {
- c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
- //TODO 如何回传日志
- continue
- }
- }
- select {
- case signal := <-commonService.ChannelKillSubscriber:
- if signal == 1 {
- commonConfig.RosNode.Close()
- commonService.AddKillTimes("3")
- return
- }
- }
- }
- func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *entity.TimeWindow) {
- masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
- if lastTimeWindow == nil || commonUtil.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
- // 2-1 如果是不在旧故障窗口内,添加一个新窗口
- newTimeWindow := entity.TimeWindow{
- FaultTime: faultHappenTime,
- TimeWindowBegin: commonUtil.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
- TimeWindowEnd: commonUtil.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
- Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
- Labels: []string{faultLabel},
- MasterTopics: masterTopics,
- SlaveTopics: slaveTopics,
- }
- c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
- entity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
- } else {
- // 2-2 如果在旧故障窗口内
- entity.TimeWindowProducerQueueMutex.RLock()
- defer entity.TimeWindowProducerQueueMutex.RUnlock()
- // 2-2-1 更新故障窗口end时间
- maxEnd := commonUtil.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
- expectEnd := commonUtil.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
- if commonUtil.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
- lastTimeWindow.TimeWindowEnd = maxEnd
- lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
- } else {
- if commonUtil.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
- lastTimeWindow.TimeWindowEnd = expectEnd
- lastTimeWindow.Length = commonUtil.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
- }
- }
- // 2-2-2 更新label
- labels := lastTimeWindow.Labels
- lastTimeWindow.Labels = commonUtil.AppendIfNotExists(labels, faultLabel)
- // 2-2-3 更新 topic
- sourceMasterTopics := lastTimeWindow.MasterTopics
- lastTimeWindow.MasterTopics = commonUtil.MergeSlice(sourceMasterTopics, masterTopics)
- sourceSlaveTopics := lastTimeWindow.SlaveTopics
- lastTimeWindow.SlaveTopics = commonUtil.MergeSlice(sourceSlaveTopics, slaveTopics)
- c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
- }
- }
- func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
- // 获取所有需要采集的topic
- var faultCodeTopics []string
- for _, code := range commonConfig.CloudConfig.Triggers {
- if code.Label == faultLabel {
- faultCodeTopics = code.Topics
- }
- }
- return faultCodeTopics, nil
- }
- // 判断采集包数量是否超过限额
- func canCollect() bool {
- responseString, err := commonUtil.HttpPostJsonWithHeaders(
- commonConfig.CloudConfig.CollectLimit.Url,
- map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
- map[string]string{
- "snCode": commonConfig.LocalConfig.SecretKey,
- "collectLimitDay": util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
- "collectLimitWeek": util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
- "collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
- "collectLimitYear": util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
- },
- )
- if err != nil {
- c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
- return false
- }
- // 解析JSON字符串到Response结构体
- var resp entity.Response
- err = json.Unmarshal([]byte(responseString), &resp)
- if err != nil {
- c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
- return false
- }
- if resp.Code != 200 { // 不是200 代表不允许采集
- c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
- return false
- }
- c_log.GlobalLogger.Info("允许采集。")
- return true
- }
|