package svc import ( "cicv-data-closedloop/pji/common/cfg" commonEntity "cicv-data-closedloop/pji/common/ent" "cicv-data-closedloop/pji/common/global" "cicv-data-closedloop/pji/common/log" commonService "cicv-data-closedloop/pji/common/svc" "cicv-data-closedloop/pji/common/util" masterConfig "cicv-data-closedloop/pji/master/pkg/cfg" "cicv-data-closedloop/pji_msgs" "github.com/bluenviron/goroslib/v2" "github.com/bluenviron/goroslib/v2/pkg/msgs/diagnostic_msgs" "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs" "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs" "os" "sync" ) var ( subscriber0Mutex sync.Mutex subscriber1Mutex sync.Mutex subscriber2Mutex sync.Mutex subscriber3Mutex sync.Mutex subscriber4Mutex sync.Mutex subscriber5Mutex sync.Mutex ) // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口 func PrepareTimeWindowProducerQueue() { //log.GlobalLogger.Info("创建订阅者订阅主题", masterConfig.TopicOfOdom) //subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ // Node: cfg.RosNode, // Topic: masterConfig.TopicOfOdom, // Callback: func(data *nav_msgs.Odometry) { // //log.GlobalLogger.Info("话题", masterConfig.TopicOfOdom, "接收到数据", data) // subscriber0Mutex.Lock() // { // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 // lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口 // var faultLabel string // for _, f := range masterConfig.RuleOfOdom { // faultLabel = f(data) // if faultLabel != "" { // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) // break // } // } // } // subscriber0Mutex.Unlock() // }, //}) //if err != nil { // log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfOdom, "发生故障:", err) // os.Exit(-1) //} log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfObstacleDetection) subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: cfg.RosNode, Topic: masterConfig.TopicOfObstacleDetection, Callback: func(data *std_msgs.UInt8) { //log.GlobalLogger.Info("话题", masterConfig.TopicOfObstacleDetection, "接收到数据", data) subscriber1Mutex.Lock() { faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string for _, f := range masterConfig.RuleOfObstacleDetection { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } } subscriber1Mutex.Unlock() }, }) if err != nil { log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err) os.Exit(-1) } log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfSysInfo) subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: cfg.RosNode, Topic: masterConfig.TopicOfSysInfo, Callback: func(data *pji_msgs.SysInfo) { //log.GlobalLogger.Info("话题", masterConfig.TopicOfSysInfo, "接收到数据", data) subscriber2Mutex.Lock() { faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string for _, f := range masterConfig.RuleOfSysInfo { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } } subscriber2Mutex.Unlock() }, }) if err != nil { log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err) os.Exit(-1) } log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfLocateInfo) subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: cfg.RosNode, Topic: masterConfig.TopicOfLocateInfo, Callback: func(data *pji_msgs.LocateInfo) { //log.GlobalLogger.Info("话题", masterConfig.TopicOfLocateInfo, "接收到数据", data) subscriber3Mutex.Lock() { faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string for _, f := range masterConfig.RuleOfLocateInfo { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } } subscriber3Mutex.Unlock() }, }) if err != nil { log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err) os.Exit(-1) } log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfImu) subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: cfg.RosNode, Topic: masterConfig.TopicOfImu, Callback: func(data *sensor_msgs.Imu) { //log.GlobalLogger.Info("话题", masterConfig.TopicOfImu, "接收到数据", data) subscriber4Mutex.Lock() { faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string for _, f := range masterConfig.RuleOfImu { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } } subscriber4Mutex.Unlock() }, }) if err != nil { log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err) os.Exit(-1) } log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfDiagnostics) subscriber5, err := goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: cfg.RosNode, Topic: masterConfig.TopicOfDiagnostics, Callback: func(data *diagnostic_msgs.DiagnosticArray) { //log.GlobalLogger.Info("话题", masterConfig.TopicOfDiagnostics, "接收到数据", data) subscriber5Mutex.Lock() { faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间 lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口 var faultLabel string for _, f := range masterConfig.RuleOfDiagnostics { faultLabel = f(data) if faultLabel != "" { saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow) break } } } subscriber5Mutex.Unlock() }, }) if err != nil { log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err) os.Exit(-1) } select { case signal := <-commonService.ChannelKillSubscriber: if signal == 1 { //subscriber0.Close() subscriber1.Close() subscriber2.Close() subscriber3.Close() subscriber4.Close() subscriber5.Close() cfg.RosNode.Close() commonService.AddKillTimes("3") return } } } func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) { masterTopics, slaveTopics := getTopicsOfNode(faultLabel) if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) { // 2-1 如果是不在旧故障窗口内,添加一个新窗口 newTimeWindow := commonEntity.TimeWindow{ FaultTime: faultHappenTime, TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -cfg.PlatformConfig.TaskBeforeTime), TimeWindowEnd: util.TimeCustomChange(faultHappenTime, cfg.PlatformConfig.TaskAfterTime), Length: cfg.PlatformConfig.TaskBeforeTime + cfg.PlatformConfig.TaskAfterTime + 1, Labels: []string{faultLabel}, MasterTopics: masterTopics, SlaveTopics: slaveTopics, } log.GlobalLogger.Info("不在旧故障窗口内,向生产者队列添加一个新窗口:", newTimeWindow) util.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow) } else { // 2-2 如果在旧故障窗口内 global.TimeWindowProducerQueueMutex.RLock() defer global.TimeWindowProducerQueueMutex.RUnlock() log.GlobalLogger.Info("在旧故障窗口内,更新生产者队列最新的窗口 - 开始:", lastTimeWindow) // 2-2-1 更新故障窗口end时间 { maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, cfg.PlatformConfig.TaskMaxTime) expectEnd := util.TimeCustomChange(faultHappenTime, cfg.PlatformConfig.TaskAfterTime) if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) { lastTimeWindow.TimeWindowEnd = maxEnd lastTimeWindow.Length = cfg.PlatformConfig.TaskMaxTime } else { if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) { lastTimeWindow.TimeWindowEnd = expectEnd lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd) } } } // 2-2-2 更新label { labels := lastTimeWindow.Labels lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel) } // 2-2-3 更新 topic { sourceMasterTopics := lastTimeWindow.MasterTopics lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics) sourceSlaveTopics := lastTimeWindow.SlaveTopics lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics) } log.GlobalLogger.Info("在旧故障窗口内,更新窗口 - 结束:", *lastTimeWindow) } } func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) { // 获取所有需要采集的topic var faultCodeTopics []string for _, code := range cfg.CloudConfig.Triggers { if code.Label == faultLabel { faultCodeTopics = code.Topics } } return faultCodeTopics, nil //// 根据不同节点采集的topic进行分配采集 //for _, acceptTopic := range faultCodeTopics { // for _, host := range cfg.CloudConfig.Hosts { // for _, topic := range host.Topics { // if host.Name == cfg.CloudConfig.Hosts[0].Name && acceptTopic == topic { // masterTopics = append(masterTopics, acceptTopic) // } // } // } //} //return masterTopics, slaveTopics }