package svc import ( commonConfig "cicv-data-closedloop/kinglong/common/cfg" "cicv-data-closedloop/kinglong/common/ent" "cicv-data-closedloop/kinglong/common/global" "cicv-data-closedloop/kinglong/common/log" "cicv-data-closedloop/kinglong/common/svc" "cicv-data-closedloop/kinglong/common/util" masterConfig "cicv-data-closedloop/kinglong/master/pkg/cfg" masterEntity "cicv-data-closedloop/kinglong_msgs" "github.com/bluenviron/goroslib/v2" "os" "sync" "time" ) var ( subscriber0Mutex sync.Mutex subscriber1Mutex sync.Mutex subscriber2Mutex sync.Mutex subscriber3Mutex sync.Mutex subscriber4Mutex sync.Mutex m sync.RWMutex velocityX float64 velocityY float64 yaw float64 ) // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口 func PrepareTimeWindowProducerQueue() { log.GlobalLogger.Info("订阅者 goroutine,启动。") //创建订阅者0订阅主题 nodefault_info log.GlobalLogger.Info("创建订阅者0订阅话题 ", masterConfig.TopicOfNodeFaultInfo) subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: masterConfig.TopicOfNodeFaultInfo, Callback: func(data *masterEntity.FaultInfo) { if len(masterConfig.RuleOfNodefaultInfo) == 0 { //log.GlobalLogger.Info("话题 nodefault_info没有触发器") return } global.Subscriber0TimeMutex.Lock() // 判断是否是连续故障码 gap := time.Since(global.Subscriber0Time).Seconds() if gap < 2 { global.Subscriber0Time = time.Now() global.Subscriber0TimeMutex.Unlock() return } else { // 2 不是连续故障码 global.Subscriber0Time = time.Now() global.Subscriber0TimeMutex.Unlock() subscriber0Mutex.Lock() { faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string for _, f := range masterConfig.RuleOfNodefaultInfo { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } } subscriber0Mutex.Unlock() } }}) if err != nil { log.GlobalLogger.Info("创建订阅者0发生故障:", err) os.Exit(-1) } // 创建订阅者1订阅主题 cicv_location log.GlobalLogger.Info("创建订阅者1订阅话题 ", masterConfig.TopicOfCicvLocation) subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: masterConfig.TopicOfCicvLocation, Callback: func(data *masterEntity.PerceptionLocalization) { if len(masterConfig.RuleOfCicvLocation) == 0 { log.GlobalLogger.Info("话题 cicv_location 没有触发器") return } subscriber1Mutex.Lock() { faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口 // 更新共享变量 m.RLock() { velocityX = data.VelocityX velocityY = data.VelocityY yaw = data.Yaw } m.RUnlock() var faultLabel string for _, f := range masterConfig.RuleOfCicvLocation { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } } subscriber1Mutex.Unlock() }, }) if err != nil { log.GlobalLogger.Info("创建订阅者1发生故障:", err) os.Exit(-1) } // 创建订阅者2订阅主题 tpperception log.GlobalLogger.Info("创建订阅者2订阅话题 ", masterConfig.TopicOfTpperception) subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: masterConfig.TopicOfTpperception, Callback: func(data *masterEntity.PerceptionObjects) { if len(masterConfig.RuleOfTpperception) == 0 { log.GlobalLogger.Info("话题 tpperception 没有触发器") return } global.Subscriber2TimeMutex.Lock() // 判断是否是连续故障码 gap := time.Since(global.Subscriber2Time).Seconds() if gap < 10 { global.Subscriber2TimeMutex.Unlock() return } else { // 2 不是连续故障码 global.Subscriber2Time = time.Now() global.Subscriber2TimeMutex.Unlock() subscriber2Mutex.Lock() { faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string for _, f := range masterConfig.RuleOfTpperception { faultLabel = f(data, velocityX, velocityY, yaw) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } } subscriber2Mutex.Unlock() } }}) if err != nil { log.GlobalLogger.Info("创建订阅者2发生故障:", err) os.Exit(-1) } // 创建订阅者3订阅主题 fault_info log.GlobalLogger.Info("创建订阅者3订阅话题 ", masterConfig.TopicOfFaultInfo) subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: masterConfig.TopicOfFaultInfo, Callback: func(data *masterEntity.FaultVec) { if len(masterConfig.RuleOfFaultInfo) == 0 { log.GlobalLogger.Info("话题 fault_info 没有触发器") return } global.Subscriber3TimeMutex.Lock() // 判断是否是连续故障码 gap := time.Since(global.Subscriber3Time).Seconds() if gap < 2 { global.Subscriber3Time = time.Now() global.Subscriber3TimeMutex.Unlock() return } else { // 2 不是连续故障码 global.Subscriber3Time = time.Now() global.Subscriber3TimeMutex.Unlock() subscriber3Mutex.Lock() { faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string for _, f := range masterConfig.RuleOfFaultInfo { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } } subscriber3Mutex.Unlock() } }}) if err != nil { log.GlobalLogger.Info("创建订阅者3发生故障:", err) os.Exit(-1) } // 创建订阅者4订阅主题 data_read // TODO 高频率触发 log.GlobalLogger.Info("创建订阅者4订阅话题 ", masterConfig.TopicOfDataRead) subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: masterConfig.TopicOfDataRead, Callback: func(data *masterEntity.Retrieval) { if len(masterConfig.RuleOfDataRead) == 0 { //log.GlobalLogger.Info("话题 data_read 没有触发器") return } subscriber4Mutex.Lock() { faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string for _, f := range masterConfig.RuleOfDataRead { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } } subscriber4Mutex.Unlock() }, }) if err != nil { log.GlobalLogger.Info("创建订阅者3发生故障:", err) os.Exit(-1) } select { case signal := <-svc.ChannelKillWindowProducer: if signal == 1 { defer svc.AddKillTimes("3") subscriber0.Close() subscriber1.Close() subscriber2.Close() subscriber3.Close() subscriber4.Close() commonConfig.RosNode.Close() return } } } func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *ent.TimeWindow) { masterTopics, slaveTopics := getTopicsOfNode(faultLabel) if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) { // 2-1 如果是不在旧故障窗口内,添加一个新窗口 newTimeWindow := ent.TimeWindow{ FaultTime: faultHappenTime, TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime), TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime), Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1, Labels: []string{faultLabel}, MasterTopics: masterTopics, SlaveTopics: slaveTopics, } log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length) util.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow) } else { // 2-2 如果在旧故障窗口内 global.TimeWindowProducerQueueMutex.RLock() defer global.TimeWindowProducerQueueMutex.RUnlock() // 2-2-1 更新故障窗口end时间 maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime) expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime) if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) { lastTimeWindow.TimeWindowEnd = maxEnd lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime } else { if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) { lastTimeWindow.TimeWindowEnd = expectEnd lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd) } } // 2-2-2 更新label labels := lastTimeWindow.Labels lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel) // 2-2-3 更新 topic sourceMasterTopics := lastTimeWindow.MasterTopics lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics) sourceSlaveTopics := lastTimeWindow.SlaveTopics lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics) log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length) } } func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) { // 获取所有需要采集的topic var faultCodeTopics []string for _, code := range commonConfig.CloudConfig.Triggers { if code.Label == faultLabel { faultCodeTopics = code.Topics } } // 根据不同节点采集的topic进行分配采集 for _, acceptTopic := range faultCodeTopics { for _, host := range commonConfig.CloudConfig.Hosts { for _, topic := range host.Topics { if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic { masterTopics = append(masterTopics, acceptTopic) } if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic { slaveTopics = append(slaveTopics, acceptTopic) } } } } return masterTopics, slaveTopics }