|
@@ -1,382 +0,0 @@
|
|
|
-package service
|
|
|
-
|
|
|
-import (
|
|
|
- commonConfig "cicv-data-closedloop/aarch64/kinglong/common/config"
|
|
|
- "cicv-data-closedloop/aarch64/kinglong/common/service"
|
|
|
- masterConfig "cicv-data-closedloop/aarch64/kinglong/master/package/config"
|
|
|
- "cicv-data-closedloop/common/config/c_log"
|
|
|
- commonEntity "cicv-data-closedloop/common/entity"
|
|
|
- "cicv-data-closedloop/common/util"
|
|
|
- "cicv-data-closedloop/kinglong_msgs"
|
|
|
- "cicv-data-closedloop/kinglong_param"
|
|
|
- "github.com/bluenviron/goroslib/v2"
|
|
|
- "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
|
|
|
- "math"
|
|
|
- "sync"
|
|
|
- "time"
|
|
|
-)
|
|
|
-
|
|
|
-var (
|
|
|
- extendParam kinglong_param.KinglongParam
|
|
|
-
|
|
|
- mutexOfCicvLocation sync.RWMutex
|
|
|
-
|
|
|
- mutexOfTpperception sync.RWMutex
|
|
|
-
|
|
|
- mutexOfJinlongControlPub sync.RWMutex
|
|
|
-
|
|
|
- mutexOfDataRead sync.RWMutex
|
|
|
-)
|
|
|
-
|
|
|
-
|
|
|
-func PrepareTimeWindowProducerQueue() {
|
|
|
- c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
|
|
|
-
|
|
|
- var err error
|
|
|
- subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
|
|
|
- subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
|
|
|
- subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
|
|
|
- subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
|
|
|
- for i, topic := range commonConfig.SubscribeTopics {
|
|
|
-
|
|
|
- if topic == masterConfig.TopicOfCicvExtend {
|
|
|
- for {
|
|
|
- time.Sleep(time.Duration(3500) * time.Millisecond)
|
|
|
- for _, f := range masterConfig.RuleOfCicvExtend {
|
|
|
- label := f(extendParam)
|
|
|
- if label != "" {
|
|
|
- saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
|
|
|
-
|
|
|
- if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
|
|
|
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- Node: commonConfig.RosNode,
|
|
|
- Topic: topic,
|
|
|
- Callback: func(data *kinglong_msgs.PerceptionLocalization) {
|
|
|
-
|
|
|
- mutexOfCicvLocation.RLock()
|
|
|
- {
|
|
|
- extendParam.VelocityYOfCicvLocation = data.VelocityX
|
|
|
- extendParam.VelocityYOfCicvLocation = data.VelocityY
|
|
|
- extendParam.YawOfCicvLocation = data.Yaw
|
|
|
- extendParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
|
|
|
- }
|
|
|
- mutexOfCicvLocation.RUnlock()
|
|
|
-
|
|
|
- subscribersTimeMutexes[i].Lock()
|
|
|
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
- subscribersMutexes[i].Lock()
|
|
|
- faultHappenTime := util.GetNowTimeCustom()
|
|
|
- lastTimeWindow := commonEntity.GetLastTimeWindow()
|
|
|
- var faultLabel string
|
|
|
- for _, f := range masterConfig.RuleOfCicvLocation {
|
|
|
- faultLabel = f(data)
|
|
|
- if faultLabel != "" {
|
|
|
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- subscribersMutexes[i].Unlock()
|
|
|
- }
|
|
|
- subscribersTimeMutexes[i].Unlock()
|
|
|
- },
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- if topic == masterConfig.TopicOfTpperception && len(masterConfig.RuleOfTpperception) > 0 {
|
|
|
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- Node: commonConfig.RosNode,
|
|
|
- Topic: topic,
|
|
|
- Callback: func(data *kinglong_msgs.PerceptionObjects) {
|
|
|
-
|
|
|
- mutexOfTpperception.RLock()
|
|
|
- {
|
|
|
- for _, obj := range data.Objs {
|
|
|
- if (obj.Type != 1 && obj.Type != 0) || obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
|
|
|
- continue
|
|
|
- }
|
|
|
- if _, ok := extendParam.ObjDicOfTpperception[obj.Id]; !ok {
|
|
|
- extendParam.ObjDicOfTpperception[obj.Id] = []float32{}
|
|
|
- }
|
|
|
- extendParam.ObjDicOfTpperception[obj.Id] = append(extendParam.ObjDicOfTpperception[obj.Id], obj.Y)
|
|
|
- }
|
|
|
- }
|
|
|
- mutexOfTpperception.RUnlock()
|
|
|
-
|
|
|
- subscribersTimeMutexes[i].Lock()
|
|
|
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
- subscribersMutexes[i].Lock()
|
|
|
- faultHappenTime := util.GetNowTimeCustom()
|
|
|
- lastTimeWindow := commonEntity.GetLastTimeWindow()
|
|
|
- var faultLabel string
|
|
|
- for _, f := range masterConfig.RuleOfTpperception {
|
|
|
- faultLabel = f(data, extendParam)
|
|
|
- if faultLabel != "" {
|
|
|
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- subscribersMutexes[i].Unlock()
|
|
|
- }
|
|
|
- subscribersTimeMutexes[i].Unlock()
|
|
|
-
|
|
|
-
|
|
|
- mutexOfTpperception.RLock()
|
|
|
- {
|
|
|
- for _, obj := range data.Objs {
|
|
|
- extendParam.ObjTypeDicOfTpperception[obj.Id] = obj.Type
|
|
|
- extendParam.ObjSpeedDicOfTpperception[obj.Id] = math.Pow(math.Pow(float64(obj.Vxabs), 2)+math.Pow(float64(obj.Vyabs), 2), 0.5)
|
|
|
- }
|
|
|
- }
|
|
|
- mutexOfTpperception.RUnlock()
|
|
|
-
|
|
|
- },
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- if topic == masterConfig.TopicOfDataRead {
|
|
|
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- Node: commonConfig.RosNode,
|
|
|
- Topic: topic,
|
|
|
- Callback: func(data *kinglong_msgs.Retrieval) {
|
|
|
-
|
|
|
- mutexOfDataRead.RLock()
|
|
|
- {
|
|
|
- extendParam.NumCountDataReadOfDataRead++
|
|
|
- if extendParam.NumCountDataReadOfDataRead == 10 {
|
|
|
- extendParam.EgoSteeringRealOfDataRead = append(extendParam.EgoSteeringRealOfDataRead, data.StrgAngleRealValue)
|
|
|
- extendParam.EgoThrottleRealOfDataRead = append(extendParam.EgoThrottleRealOfDataRead, data.VcuAccelPosValue)
|
|
|
- extendParam.EgoBrakeRealOfDataRead = append(extendParam.EgoBrakeRealOfDataRead, data.VcuBrkPelPosValue)
|
|
|
- extendParam.NumCountDataReadOfDataRead = 0
|
|
|
- }
|
|
|
- }
|
|
|
- mutexOfDataRead.RUnlock()
|
|
|
- subscribersTimeMutexes[i].Lock()
|
|
|
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
- subscribersMutexes[i].Lock()
|
|
|
- faultHappenTime := util.GetNowTimeCustom()
|
|
|
- lastTimeWindow := commonEntity.GetLastTimeWindow()
|
|
|
- var faultLabel string
|
|
|
- for _, f := range masterConfig.RuleOfDataRead {
|
|
|
- faultLabel = f(data)
|
|
|
- if faultLabel != "" {
|
|
|
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- subscribersTimes[i] = time.Now()
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- subscribersMutexes[i].Unlock()
|
|
|
- }
|
|
|
- subscribersTimeMutexes[i].Unlock()
|
|
|
- },
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- if topic == masterConfig.TopicOfJinlongControlPub && len(masterConfig.RuleOfJinlongControlPub) > 0 {
|
|
|
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- Node: commonConfig.RosNode,
|
|
|
- Topic: topic,
|
|
|
- Callback: func(data *kinglong_msgs.JinlongControlCommand) {
|
|
|
-
|
|
|
- mutexOfJinlongControlPub.RLock()
|
|
|
- {
|
|
|
- extendParam.NumCountJinlongControlCommandOfPjControlPub++
|
|
|
- if extendParam.NumCountJinlongControlCommandOfPjControlPub == 10 {
|
|
|
- extendParam.EgoSteeringCmdOfJinlongControlPub = append(extendParam.EgoSteeringCmdOfJinlongControlPub, data.ASStrgAngleReq)
|
|
|
- extendParam.EgoThrottleCmdOfJinlongControlPub = append(extendParam.EgoThrottleCmdOfJinlongControlPub, data.ASAutoDAccelPosReq)
|
|
|
- extendParam.EgoBrakeCmdOfJinlongControlPub = append(extendParam.EgoBrakeCmdOfJinlongControlPub, data.ASAutoDBrkPelPosReq)
|
|
|
- extendParam.NumCountJinlongControlCommandOfPjControlPub = 0
|
|
|
- }
|
|
|
- }
|
|
|
- mutexOfJinlongControlPub.RUnlock()
|
|
|
-
|
|
|
- subscribersTimeMutexes[i].Lock()
|
|
|
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
- subscribersMutexes[i].Lock()
|
|
|
- faultHappenTime := util.GetNowTimeCustom()
|
|
|
- lastTimeWindow := commonEntity.GetLastTimeWindow()
|
|
|
- var faultLabel string
|
|
|
- for _, f := range masterConfig.RuleOfJinlongControlPub {
|
|
|
- faultLabel = f(data)
|
|
|
- if faultLabel != "" {
|
|
|
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- subscribersMutexes[i].Unlock()
|
|
|
- }
|
|
|
- subscribersTimeMutexes[i].Unlock()
|
|
|
- },
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- if topic == masterConfig.TopicOfFailureLidar && len(masterConfig.RuleOfFailureLidar) > 0 {
|
|
|
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- Node: commonConfig.RosNode,
|
|
|
- Topic: topic,
|
|
|
- Callback: func(data *std_msgs.Bool) {
|
|
|
- subscribersTimeMutexes[i].Lock()
|
|
|
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
- subscribersMutexes[i].Lock()
|
|
|
- faultHappenTime := util.GetNowTimeCustom()
|
|
|
- lastTimeWindow := commonEntity.GetLastTimeWindow()
|
|
|
- var faultLabel string
|
|
|
- for _, f := range masterConfig.RuleOfFailureLidar {
|
|
|
- faultLabel = f(data)
|
|
|
- if faultLabel != "" {
|
|
|
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- subscribersMutexes[i].Unlock()
|
|
|
- }
|
|
|
- subscribersTimeMutexes[i].Unlock()
|
|
|
- },
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- if topic == masterConfig.TopicOfFailureRadar && len(masterConfig.RuleOfFailureRadar) > 0 {
|
|
|
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- Node: commonConfig.RosNode,
|
|
|
- Topic: topic,
|
|
|
- Callback: func(data *std_msgs.Bool) {
|
|
|
- subscribersTimeMutexes[i].Lock()
|
|
|
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
- subscribersMutexes[i].Lock()
|
|
|
- faultHappenTime := util.GetNowTimeCustom()
|
|
|
- lastTimeWindow := commonEntity.GetLastTimeWindow()
|
|
|
- var faultLabel string
|
|
|
- for _, f := range masterConfig.RuleOfFailureRadar {
|
|
|
- faultLabel = f(data)
|
|
|
- if faultLabel != "" {
|
|
|
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- subscribersMutexes[i].Unlock()
|
|
|
- }
|
|
|
- subscribersTimeMutexes[i].Unlock()
|
|
|
- },
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- if topic == masterConfig.TopicOfFailureCamera && len(masterConfig.RuleOfFailureLidar) > 0 {
|
|
|
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- Node: commonConfig.RosNode,
|
|
|
- Topic: topic,
|
|
|
- Callback: func(data *std_msgs.Bool) {
|
|
|
- subscribersTimeMutexes[i].Lock()
|
|
|
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
- subscribersMutexes[i].Lock()
|
|
|
- faultHappenTime := util.GetNowTimeCustom()
|
|
|
- lastTimeWindow := commonEntity.GetLastTimeWindow()
|
|
|
- var faultLabel string
|
|
|
- for _, f := range masterConfig.RuleOfFailureLidar {
|
|
|
- faultLabel = f(data)
|
|
|
- if faultLabel != "" {
|
|
|
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- subscribersMutexes[i].Unlock()
|
|
|
- }
|
|
|
- subscribersTimeMutexes[i].Unlock()
|
|
|
- },
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- if err != nil {
|
|
|
- c_log.GlobalLogger.Info("创建订阅者报错:", err)
|
|
|
-
|
|
|
- continue
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- select {
|
|
|
- case signal := <-service.ChannelKillWindowProducer:
|
|
|
- if signal == 1 {
|
|
|
- commonConfig.RosNode.Close()
|
|
|
- service.AddKillTimes("3")
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
|
|
|
- masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
|
|
|
- if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
|
|
|
-
|
|
|
- newTimeWindow := commonEntity.TimeWindow{
|
|
|
- FaultTime: faultHappenTime,
|
|
|
- TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
|
|
|
- TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
|
|
|
- Length: commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
|
|
|
- Labels: []string{faultLabel},
|
|
|
- MasterTopics: masterTopics,
|
|
|
- SlaveTopics: slaveTopics,
|
|
|
- }
|
|
|
- c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
|
|
|
- commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
|
|
|
- } else {
|
|
|
-
|
|
|
- commonEntity.TimeWindowProducerQueueMutex.RLock()
|
|
|
- defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
|
|
|
-
|
|
|
- maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
|
|
|
- expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
|
|
|
- if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
|
|
|
- lastTimeWindow.TimeWindowEnd = maxEnd
|
|
|
- lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
|
|
|
- } else {
|
|
|
- if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
|
|
|
- lastTimeWindow.TimeWindowEnd = expectEnd
|
|
|
- lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- labels := lastTimeWindow.Labels
|
|
|
- lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
|
|
|
-
|
|
|
- sourceMasterTopics := lastTimeWindow.MasterTopics
|
|
|
- lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
|
|
|
- sourceSlaveTopics := lastTimeWindow.SlaveTopics
|
|
|
- lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
|
|
|
- c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
|
|
|
-
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
|
|
|
-
|
|
|
- var faultCodeTopics []string
|
|
|
- for _, code := range commonConfig.CloudConfig.Triggers {
|
|
|
- if code.Label == faultLabel {
|
|
|
- faultCodeTopics = code.Topics
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- for _, acceptTopic := range faultCodeTopics {
|
|
|
- for _, host := range commonConfig.CloudConfig.Hosts {
|
|
|
- for _, topic := range host.Topics {
|
|
|
- if host.Name == commonConfig.CloudConfig.Hosts[0].Name && acceptTopic == topic {
|
|
|
- masterTopics = append(masterTopics, acceptTopic)
|
|
|
- }
|
|
|
- if host.Name == commonConfig.CloudConfig.Hosts[1].Name && acceptTopic == topic {
|
|
|
- slaveTopics = append(slaveTopics, acceptTopic)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return masterTopics, slaveTopics
|
|
|
-}
|