123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- package service
- import (
- commonConfig "cicv-data-closedloop/aarch64/kinglong/common/config"
- "cicv-data-closedloop/aarch64/kinglong/common/service"
- masterConfig "cicv-data-closedloop/aarch64/kinglong/master/package/config"
- "cicv-data-closedloop/common/config/c_log"
- commonEntity "cicv-data-closedloop/common/entity"
- "cicv-data-closedloop/common/util"
- "cicv-data-closedloop/kinglong_msgs"
- "github.com/bluenviron/goroslib/v2"
- "sync"
- "time"
- )
- var (
- extendParam commonEntity.KinglongParam
- // /cicv_location
- mutexOfCicvLocation sync.RWMutex
- // /tpperception
- mutexOfTpperception sync.RWMutex
- // /pj_control_pub
- mutexOfPjControlPub sync.RWMutex
- )
- // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
- func PrepareTimeWindowProducerQueue() {
- c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
- var err error
- subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
- subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
- subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
- subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
- for i, topic := range commonConfig.SubscribeTopics {
- // !!!扩展的定时任务监听,牛逼的设计!!!扩展性拉满啦
- if topic == masterConfig.TopicOfCicvExtend {
- for {
- time.Sleep(time.Duration(3500) * time.Millisecond)
- for _, f := range masterConfig.RuleOfCicvExtend {
- label := f(extendParam)
- if label != "" {
- saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
- break
- }
- }
- }
- }
- c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
- if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: commonConfig.RosNode,
- Topic: topic,
- Callback: func(data *kinglong_msgs.PerceptionLocalization) {
- // 更新共享变量
- mutexOfCicvLocation.RLock()
- {
- extendParam.VelocityYOfCicvLocation = data.VelocityX
- extendParam.VelocityYOfCicvLocation = data.VelocityY
- extendParam.YawOfCicvLocation = data.Yaw
- extendParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
- }
- mutexOfCicvLocation.RUnlock()
- subscribersTimeMutexes[i].Lock()
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
- subscribersMutexes[i].Lock()
- faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
- var faultLabel string
- for _, f := range masterConfig.RuleOfCicvLocation {
- faultLabel = f(data)
- if faultLabel != "" {
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- break
- }
- }
- subscribersMutexes[i].Unlock()
- }
- subscribersTimeMutexes[i].Unlock()
- },
- })
- }
- if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: commonConfig.RosNode,
- Topic: topic,
- Callback: func(data *kinglong_msgs.PerceptionObjects) {
- subscribersTimeMutexes[i].Lock()
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
- subscribersMutexes[i].Lock()
- faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
- var faultLabel string
- for _, f := range masterConfig.RuleOfTpperception {
- faultLabel = f(data, extendParam)
- if faultLabel != "" {
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- break
- }
- }
- subscribersMutexes[i].Unlock()
- }
- subscribersTimeMutexes[i].Unlock()
- },
- })
- }
- if topic == masterConfig.TopicOfDataRead {
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: commonConfig.RosNode,
- Topic: topic,
- Callback: func(data *kinglong_msgs.Retrieval) {
- subscribersTimeMutexes[i].Lock()
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
- subscribersMutexes[i].Lock()
- faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
- var faultLabel string
- for _, f := range masterConfig.RuleOfDataRead {
- faultLabel = f(data)
- if faultLabel != "" {
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- subscribersTimes[i] = time.Now()
- break
- }
- }
- subscribersMutexes[i].Unlock()
- }
- subscribersTimeMutexes[i].Unlock()
- },
- })
- }
- if err != nil {
- c_log.GlobalLogger.Info("创建订阅者报错:", err)
- //TODO 如何回传日志
- continue
- }
- }
- ////创建订阅者0订阅主题 nodefault_info
- //c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfNodeFaultInfo)
- //subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
- // Node: commonConfig.RosNode,
- // Topic: masterConfig.TopicOfNodeFaultInfo,
- // Callback: func(data *kinglong_msgs.FaultInfo) {
- // if len(masterConfig.RuleOfNodefaultInfo) == 0 {
- // //c_log.GlobalLogger.Info("话题 nodefault_info没有触发器")
- // return
- // }
- // commonEntity.Subscriber0TimeMutex.Lock()
- // if time.Since(commonEntity.Subscriber0Time).Seconds() > 1 {
- // commonEntity.Subscriber0TimeMutex.Unlock()
- // subscriber0Mutex.Lock()
- // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
- // var faultLabel string
- // for _, f := range masterConfig.RuleOfNodefaultInfo {
- // faultLabel = f(data)
- // if faultLabel != "" {
- // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- // commonEntity.Subscriber0Time = time.Now()
- // break
- // }
- // }
- // subscriber0Mutex.Unlock()
- // }
- // commonEntity.Subscriber0TimeMutex.Unlock()
- // }})
- //if err != nil {
- // c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
- // os.Exit(-1)
- //}
- //// 创建订阅者1订阅主题 cicv_location
- //c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfCicvLocation)
- //subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
- // Node: commonConfig.RosNode,
- // Topic: masterConfig.TopicOfCicvLocation,
- // Callback: func(data *kinglong_msgs.PerceptionLocalization) {
- // m.RLock()
- // velocityX = data.VelocityX
- // velocityY = data.VelocityY
- // yaw = data.Yaw
- // m.RUnlock()
- //
- // if len(masterConfig.RuleOfCicvLocation) == 0 {
- // c_log.GlobalLogger.Info("话题 cicv_location 没有触发器")
- // return
- // }
- // commonEntity.Subscriber1TimeMutex.Lock()
- // if time.Since(commonEntity.Subscriber1Time).Seconds() > 1 {
- // subscriber1Mutex.Lock()
- // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
- // // 更新共享变量
- // var faultLabel string
- // for _, f := range masterConfig.RuleOfCicvLocation {
- // faultLabel = f(data)
- // if faultLabel != "" {
- // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- // commonEntity.Subscriber1Time = time.Now()
- // break
- // }
- // }
- // subscriber1Mutex.Unlock()
- // }
- // commonEntity.Subscriber1TimeMutex.Unlock()
- // },
- //})
- //if err != nil {
- // c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
- // os.Exit(-1)
- //}
- //c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfTpperception)
- //subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
- // Node: commonConfig.RosNode,
- // Topic: masterConfig.TopicOfTpperception,
- // Callback: func(data *kinglong_msgs.PerceptionObjects) {
- // if len(masterConfig.RuleOfTpperception) == 0 {
- // c_log.GlobalLogger.Info("话题 tpperception 没有触发器")
- // return
- // }
- // commonEntity.Subscriber2TimeMutex.Lock()
- // // 判断是否是连续故障码
- // if time.Since(commonEntity.Subscriber2Time).Seconds() > 1 {
- // // 2 不是连续故障码
- // subscriber2Mutex.Lock()
- // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
- // var faultLabel string
- // //c_log.GlobalLogger.Infof("TTC数据为:【velocityX】=%v,【velocityY】=%v", velocityX, velocityY)
- // for _, f := range masterConfig.RuleOfTpperception {
- // faultLabel = f(data, velocityX, velocityY, yaw)
- // if faultLabel != "" {
- // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- // commonEntity.Subscriber2Time = time.Now()
- // break
- // }
- // }
- // subscriber2Mutex.Unlock()
- // }
- // commonEntity.Subscriber2TimeMutex.Unlock()
- // }})
- //if err != nil {
- // c_log.GlobalLogger.Info("创建订阅者2发生故障:", err)
- // os.Exit(-1)
- //}
- //// 创建订阅者3订阅主题 fault_info
- //c_log.GlobalLogger.Info("创建订阅者3订阅话题 ", masterConfig.TopicOfFaultInfo)
- //subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
- // Node: commonConfig.RosNode,
- // Topic: masterConfig.TopicOfFaultInfo,
- // Callback: func(data *kinglong_msgs.FaultVec) {
- // if len(masterConfig.RuleOfFaultInfo) == 0 {
- // c_log.GlobalLogger.Info("话题 fault_info 没有触发器")
- // return
- // }
- // commonEntity.Subscriber3TimeMutex.Lock()
- // if time.Since(commonEntity.Subscriber3Time).Seconds() > 1 {
- // // 2 不是连续故障码
- // subscriber3Mutex.Lock()
- // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
- // var faultLabel string
- // for _, f := range masterConfig.RuleOfFaultInfo {
- // faultLabel = f(data)
- // if faultLabel != "" {
- // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- // commonEntity.Subscriber3Time = time.Now()
- // break
- // }
- // }
- // subscriber3Mutex.Unlock()
- // }
- // commonEntity.Subscriber3TimeMutex.Unlock()
- // }})
- //if err != nil {
- // c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
- // os.Exit(-1)
- //}
- //// 创建订阅者4订阅主题 data_read
- //// TODO 高频率触发
- //c_log.GlobalLogger.Info("创建订阅者4订阅话题 ", masterConfig.TopicOfDataRead)
- //subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
- // Node: commonConfig.RosNode,
- // Topic: masterConfig.TopicOfDataRead,
- // Callback: func(data *kinglong_msgs.Retrieval) {
- // if len(masterConfig.RuleOfDataRead) == 0 {
- // //c_log.GlobalLogger.Info("话题 data_read 没有触发器")
- // return
- // }
- // commonEntity.Subscriber4TimeMutex.Lock()
- // if time.Since(commonEntity.Subscriber4Time).Seconds() > 1 {
- // subscriber4Mutex.Lock()
- // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
- // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
- // var faultLabel string
- // for _, f := range masterConfig.RuleOfDataRead {
- // faultLabel = f(data)
- // if faultLabel != "" {
- // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
- // commonEntity.Subscriber4Time = time.Now()
- // break
- // }
- // }
- // subscriber4Mutex.Unlock()
- // }
- // commonEntity.Subscriber4TimeMutex.Unlock()
- // },
- //})
- //if err != nil {
- // c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
- // os.Exit(-1)
- //}
- select {
- case signal := <-service.ChannelKillWindowProducer:
- if signal == 1 {
- commonConfig.RosNode.Close()
- service.AddKillTimes("3")
- return
- }
- }
- }
- func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
- masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
- if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
- // 2-1 如果是不在旧故障窗口内,添加一个新窗口
- newTimeWindow := commonEntity.TimeWindow{
- FaultTime: faultHappenTime,
- TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
- TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
- Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
- Labels: []string{faultLabel},
- MasterTopics: masterTopics,
- SlaveTopics: slaveTopics,
- }
- c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
- commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
- } else {
- // 2-2 如果在旧故障窗口内
- commonEntity.TimeWindowProducerQueueMutex.RLock()
- defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
- // 2-2-1 更新故障窗口end时间
- maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
- expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
- if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
- lastTimeWindow.TimeWindowEnd = maxEnd
- lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
- } else {
- if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
- lastTimeWindow.TimeWindowEnd = expectEnd
- lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
- }
- }
- // 2-2-2 更新label
- labels := lastTimeWindow.Labels
- lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
- // 2-2-3 更新 topic
- sourceMasterTopics := lastTimeWindow.MasterTopics
- lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
- sourceSlaveTopics := lastTimeWindow.SlaveTopics
- lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
- c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
- }
- }
- func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
- // 获取所有需要采集的topic
- var faultCodeTopics []string
- for _, code := range commonConfig.CloudConfig.Triggers {
- if code.Label == faultLabel {
- faultCodeTopics = code.Topics
- }
- }
- // 根据不同节点采集的topic进行分配采集
- for _, acceptTopic := range faultCodeTopics {
- for _, host := range commonConfig.CloudConfig.Hosts {
- for _, topic := range host.Topics {
- if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
- masterTopics = append(masterTopics, acceptTopic)
- }
- if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
- slaveTopics = append(slaveTopics, acceptTopic)
- }
- }
- }
- }
- return masterTopics, slaveTopics
- }
|