package svc import ( commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config" "cicv-data-closedloop/aarch64/pjisuv/common/service" masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/package/config" "cicv-data-closedloop/common/config/c_log" "cicv-data-closedloop/common/entity" "cicv-data-closedloop/common/util" "cicv-data-closedloop/pjisuv_msgs" "github.com/bluenviron/goroslib/v2" "os" "sync" "time" ) var ( subscriber0Mutex sync.Mutex subscriber1Mutex sync.Mutex subscriber2Mutex sync.Mutex subscriber3Mutex sync.Mutex subscriber4Mutex sync.Mutex m sync.RWMutex velocityX float64 velocityY float64 yaw float64 ) // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口 func PrepareTimeWindowProducerQueue() { c_log.GlobalLogger.Info("订阅者 goroutine,启动。") //创建订阅者0订阅主题 nodefault_info c_log.GlobalLogger.Info("创建订阅者0订阅话题 ", masterConfig.TopicOfNodeFaultInfo) subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: masterConfig.TopicOfNodeFaultInfo, Callback: func(data *pjisuv_msgs.FaultInfo) { if len(masterConfig.RuleOfNodefaultInfo) == 0 { //c_log.GlobalLogger.Info("话题 nodefault_info没有触发器") return } entity.Subscriber0TimeMutex.Lock() // 判断是否是连续故障码 gap := time.Since(entity.Subscriber0Time).Seconds() if gap < 2 { entity.Subscriber0Time = time.Now() entity.Subscriber0TimeMutex.Unlock() return } else { // 2 不是连续故障码 entity.Subscriber0Time = time.Now() entity.Subscriber0TimeMutex.Unlock() subscriber0Mutex.Lock() faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string for _, f := range masterConfig.RuleOfNodefaultInfo { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } subscriber0Mutex.Unlock() } }}) if err != nil { c_log.GlobalLogger.Info("创建订阅者0发生故障:", err) os.Exit(-1) } // 创建订阅者1订阅主题 cicv_location c_log.GlobalLogger.Info("创建订阅者1订阅话题 ", masterConfig.TopicOfCicvLocation) subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: masterConfig.TopicOfCicvLocation, Callback: func(data *pjisuv_msgs.PerceptionLocalization) { if len(masterConfig.RuleOfCicvLocation) == 0 { c_log.GlobalLogger.Info("话题 cicv_location 没有触发器") return } subscriber1Mutex.Lock() faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口 // 更新共享变量 m.RLock() { velocityX = data.VelocityX velocityY = data.VelocityY yaw = data.Yaw } m.RUnlock() var faultLabel string for _, f := range masterConfig.RuleOfCicvLocation { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } subscriber1Mutex.Unlock() }, }) if err != nil { c_log.GlobalLogger.Info("创建订阅者1发生故障:", err) os.Exit(-1) } // 创建订阅者2订阅主题 tpperception c_log.GlobalLogger.Info("创建订阅者2订阅话题 ", masterConfig.TopicOfTpperception) subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: masterConfig.TopicOfTpperception, Callback: func(data *pjisuv_msgs.PerceptionObjects) { if len(masterConfig.RuleOfTpperception) == 0 { c_log.GlobalLogger.Info("话题 tpperception 没有触发器") return } entity.Subscriber2TimeMutex.Lock() // 判断是否是连续故障码 gap := time.Since(entity.Subscriber2Time).Seconds() if gap > 1 { // 2 不是连续故障码 subscriber2Mutex.Lock() faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string c_log.GlobalLogger.Infof("TTC数据为:【velocityX】=%v,【velocityY】=%v", velocityX, velocityY) for _, f := range masterConfig.RuleOfTpperception { faultLabel = f(data, velocityX, velocityY, yaw) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) entity.Subscriber2Time = time.Now() break } } subscriber2Mutex.Unlock() } entity.Subscriber2TimeMutex.Unlock() }}) if err != nil { c_log.GlobalLogger.Info("创建订阅者2发生故障:", err) os.Exit(-1) } // 创建订阅者3订阅主题 fault_info c_log.GlobalLogger.Info("创建订阅者3订阅话题 ", masterConfig.TopicOfFaultInfo) subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: masterConfig.TopicOfFaultInfo, Callback: func(data *pjisuv_msgs.FaultVec) { if len(masterConfig.RuleOfFaultInfo) == 0 { c_log.GlobalLogger.Info("话题 fault_info 没有触发器") return } entity.Subscriber3TimeMutex.Lock() // 判断是否是连续故障码 gap := time.Since(entity.Subscriber3Time).Seconds() if gap < 2 { entity.Subscriber3Time = time.Now() entity.Subscriber3TimeMutex.Unlock() return } else { // 2 不是连续故障码 entity.Subscriber3Time = time.Now() entity.Subscriber3TimeMutex.Unlock() subscriber3Mutex.Lock() { faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string for _, f := range masterConfig.RuleOfFaultInfo { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } } subscriber3Mutex.Unlock() } }}) if err != nil { c_log.GlobalLogger.Info("创建订阅者3发生故障:", err) os.Exit(-1) } // 创建订阅者4订阅主题 data_read // TODO 高频率触发 c_log.GlobalLogger.Info("创建订阅者4订阅话题 ", masterConfig.TopicOfDataRead) subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: masterConfig.TopicOfDataRead, Callback: func(data *pjisuv_msgs.Retrieval) { if len(masterConfig.RuleOfDataRead) == 0 { //c_log.GlobalLogger.Info("话题 data_read 没有触发器") return } subscriber4Mutex.Lock() faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string for _, f := range masterConfig.RuleOfDataRead { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } subscriber4Mutex.Unlock() }, }) if err != nil { c_log.GlobalLogger.Info("创建订阅者3发生故障:", err) os.Exit(-1) } select { case signal := <-service.ChannelKillWindowProducer: if signal == 1 { defer service.AddKillTimes("3") subscriber0.Close() subscriber1.Close() subscriber2.Close() subscriber3.Close() subscriber4.Close() commonConfig.RosNode.Close() return } } } func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *entity.TimeWindow) { masterTopics, slaveTopics := getTopicsOfNode(faultLabel) if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) { // 2-1 如果是不在旧故障窗口内,添加一个新窗口 newTimeWindow := entity.TimeWindow{ FaultTime: faultHappenTime, TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime), TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime), Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1, Labels: []string{faultLabel}, MasterTopics: masterTopics, SlaveTopics: slaveTopics, } c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length) entity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow) } else { // 2-2 如果在旧故障窗口内 entity.TimeWindowProducerQueueMutex.RLock() defer entity.TimeWindowProducerQueueMutex.RUnlock() // 2-2-1 更新故障窗口end时间 maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime) expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime) if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) { lastTimeWindow.TimeWindowEnd = maxEnd lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime } else { if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) { lastTimeWindow.TimeWindowEnd = expectEnd lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd) } } // 2-2-2 更新label labels := lastTimeWindow.Labels lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel) // 2-2-3 更新 topic sourceMasterTopics := lastTimeWindow.MasterTopics lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics) sourceSlaveTopics := lastTimeWindow.SlaveTopics lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics) c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length) } } func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) { // 获取所有需要采集的topic var faultCodeTopics []string for _, code := range commonConfig.CloudConfig.Triggers { if code.Label == faultLabel { faultCodeTopics = code.Topics } } // 根据不同节点采集的topic进行分配采集 for _, acceptTopic := range faultCodeTopics { for _, host := range commonConfig.CloudConfig.Hosts { for _, topic := range host.Topics { if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic { masterTopics = append(masterTopics, acceptTopic) } if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic { slaveTopics = append(slaveTopics, acceptTopic) } } } } return masterTopics, slaveTopics }