123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- package svc
- import (
- "cicv-data-closedloop/common/config/c_log"
- "cicv-data-closedloop/pji/common/cfg"
- commonEntity "cicv-data-closedloop/pji/common/ent"
- "cicv-data-closedloop/pji/common/global"
- commonService "cicv-data-closedloop/pji/common/svc"
- "cicv-data-closedloop/pji/common/util"
- masterConfig "cicv-data-closedloop/pji/master/package/cfg"
- "cicv-data-closedloop/pji_msgs"
- "github.com/bluenviron/goroslib/v2"
- "github.com/bluenviron/goroslib/v2/pkg/msgs/diagnostic_msgs"
- "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
- "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
- "os"
- "sync"
- )
- var (
- subscriber0Mutex sync.Mutex
- subscriber1Mutex sync.Mutex
- subscriber2Mutex sync.Mutex
- subscriber3Mutex sync.Mutex
- subscriber4Mutex sync.Mutex
- subscriber5Mutex sync.Mutex
- )
- // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
- func PrepareTimeWindowProducerQueue() {
- //log.GlobalLogger.Info("创建订阅者订阅主题", masterConfig.TopicOfOdom)
- //subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
- // Node: cfg.RosNode,
- // Topic: masterConfig.TopicOfOdom,
- // Callback: func(data *nav_msgs.Odometry) {
- // //log.GlobalLogger.Info("话题", masterConfig.TopicOfOdom, "接收到数据", data)
- // subscriber0Mutex.Lock()
- // {
- // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- // lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
- // var faultLabel string
- // for _, f := range masterConfig.RuleOfOdom {
- // faultLabel = f(data)
- // if faultLabel != "" {
- // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- // break
- // }
- // }
- // }
- // subscriber0Mutex.Unlock()
- // },
- //})
- //if err != nil {
- // log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfOdom, "发生故障:", err)
- // os.Exit(-1)
- //}
- c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfObstacleDetection)
- subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: cfg.RosNode,
- Topic: masterConfig.TopicOfObstacleDetection,
- Callback: func(data *std_msgs.UInt8) {
- //c_log.GlobalLogger.Info("话题", masterConfig.TopicOfObstacleDetection, "接收到数据", data)
- subscriber1Mutex.Lock()
- {
- faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
- var faultLabel string
- for _, f := range masterConfig.RuleOfObstacleDetection {
- faultLabel = f(data)
- if faultLabel != "" {
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- break
- }
- }
- }
- subscriber1Mutex.Unlock()
- },
- })
- if err != nil {
- c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
- os.Exit(-1)
- }
- c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfSysInfo)
- subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: cfg.RosNode,
- Topic: masterConfig.TopicOfSysInfo,
- Callback: func(data *pji_msgs.SysInfo) {
- //c_log.GlobalLogger.Info("话题", masterConfig.TopicOfSysInfo, "接收到数据", data)
- subscriber2Mutex.Lock()
- {
- faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
- var faultLabel string
- for _, f := range masterConfig.RuleOfSysInfo {
- faultLabel = f(data)
- if faultLabel != "" {
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- break
- }
- }
- }
- subscriber2Mutex.Unlock()
- },
- })
- if err != nil {
- c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
- os.Exit(-1)
- }
- c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfLocateInfo)
- subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: cfg.RosNode,
- Topic: masterConfig.TopicOfLocateInfo,
- Callback: func(data *pji_msgs.LocateInfo) {
- //c_log.GlobalLogger.Info("话题", masterConfig.TopicOfLocateInfo, "接收到数据", data)
- subscriber3Mutex.Lock()
- {
- faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
- var faultLabel string
- for _, f := range masterConfig.RuleOfLocateInfo {
- faultLabel = f(data)
- if faultLabel != "" {
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- break
- }
- }
- }
- subscriber3Mutex.Unlock()
- },
- })
- if err != nil {
- c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
- os.Exit(-1)
- }
- c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfImu)
- subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: cfg.RosNode,
- Topic: masterConfig.TopicOfImu,
- Callback: func(data *sensor_msgs.Imu) {
- //c_log.GlobalLogger.Info("话题", masterConfig.TopicOfImu, "接收到数据", data)
- subscriber4Mutex.Lock()
- {
- faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
- var faultLabel string
- for _, f := range masterConfig.RuleOfImu {
- faultLabel = f(data)
- if faultLabel != "" {
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- break
- }
- }
- }
- subscriber4Mutex.Unlock()
- },
- })
- if err != nil {
- c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
- os.Exit(-1)
- }
- c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfDiagnostics)
- subscriber5, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: cfg.RosNode,
- Topic: masterConfig.TopicOfDiagnostics,
- Callback: func(data *diagnostic_msgs.DiagnosticArray) {
- //c_log.GlobalLogger.Info("话题", masterConfig.TopicOfDiagnostics, "接收到数据", data)
- subscriber5Mutex.Lock()
- {
- faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
- var faultLabel string
- for _, f := range masterConfig.RuleOfDiagnostics {
- faultLabel = f(data)
- if faultLabel != "" {
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- break
- }
- }
- }
- subscriber5Mutex.Unlock()
- },
- })
- if err != nil {
- c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
- os.Exit(-1)
- }
- select {
- case signal := <-commonService.ChannelKillSubscriber:
- if signal == 1 {
- //subscriber0.Close()
- subscriber1.Close()
- subscriber2.Close()
- subscriber3.Close()
- subscriber4.Close()
- subscriber5.Close()
- cfg.RosNode.Close()
- commonService.AddKillTimes("3")
- return
- }
- }
- }
- func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
- masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
- if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
- // 2-1 如果是不在旧故障窗口内,添加一个新窗口
- newTimeWindow := commonEntity.TimeWindow{
- FaultTime: faultHappenTime,
- TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -cfg.PlatformConfig.TaskBeforeTime),
- TimeWindowEnd: util.TimeCustomChange(faultHappenTime, cfg.PlatformConfig.TaskAfterTime),
- Length: cfg.PlatformConfig.TaskBeforeTime + cfg.PlatformConfig.TaskAfterTime + 1,
- Labels: []string{faultLabel},
- MasterTopics: masterTopics,
- SlaveTopics: slaveTopics,
- }
- c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
- util.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
- } else {
- // 2-2 如果在旧故障窗口内
- global.TimeWindowProducerQueueMutex.RLock()
- defer global.TimeWindowProducerQueueMutex.RUnlock()
- // 2-2-1 更新故障窗口end时间
- maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, cfg.PlatformConfig.TaskMaxTime)
- expectEnd := util.TimeCustomChange(faultHappenTime, cfg.PlatformConfig.TaskAfterTime)
- if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
- lastTimeWindow.TimeWindowEnd = maxEnd
- lastTimeWindow.Length = cfg.PlatformConfig.TaskMaxTime
- } else {
- if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
- lastTimeWindow.TimeWindowEnd = expectEnd
- lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
- }
- }
- // 2-2-2 更新label
- labels := lastTimeWindow.Labels
- lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
- // 2-2-3 更新 topic
- sourceMasterTopics := lastTimeWindow.MasterTopics
- lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
- sourceSlaveTopics := lastTimeWindow.SlaveTopics
- lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
- c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
- }
- }
- func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
- // 获取所有需要采集的topic
- var faultCodeTopics []string
- for _, code := range cfg.CloudConfig.Triggers {
- if code.Label == faultLabel {
- faultCodeTopics = code.Topics
- }
- }
- return faultCodeTopics, nil
- //// 根据不同节点采集的topic进行分配采集
- //for _, acceptTopic := range faultCodeTopics {
- // for _, host := range cfg.CloudConfig.Hosts {
- // for _, topic := range host.Topics {
- // if host.Name == cfg.CloudConfig.Hosts[0].Name && acceptTopic == topic {
- // masterTopics = append(masterTopics, acceptTopic)
- // }
- // }
- // }
- //}
- //return masterTopics, slaveTopics
- }
|