|
@@ -9,6 +9,8 @@ import (
|
|
|
"cicv-data-closedloop/common/util"
|
|
|
"cicv-data-closedloop/kinglong_msgs"
|
|
|
"github.com/bluenviron/goroslib/v2"
|
|
|
+ "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
|
|
|
+ "math"
|
|
|
"sync"
|
|
|
"time"
|
|
|
)
|
|
@@ -20,7 +22,9 @@ var (
|
|
|
// /tpperception
|
|
|
mutexOfTpperception sync.RWMutex
|
|
|
// /pj_control_pub
|
|
|
- mutexOfPjControlPub sync.RWMutex
|
|
|
+ mutexOfJinlongControlPub sync.RWMutex
|
|
|
+ // /data_read
|
|
|
+ mutexOfDataRead sync.RWMutex
|
|
|
)
|
|
|
|
|
|
// PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
|
|
@@ -33,7 +37,7 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
|
|
|
subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
|
|
|
for i, topic := range commonConfig.SubscribeTopics {
|
|
|
- // !!!扩展的定时任务监听,牛逼的设计!!!扩展性拉满啦
|
|
|
+ // !!!扩展的定时任务监听,牛逼!!!
|
|
|
if topic == masterConfig.TopicOfCicvExtend {
|
|
|
for {
|
|
|
time.Sleep(time.Duration(3500) * time.Millisecond)
|
|
@@ -48,6 +52,7 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
}
|
|
|
|
|
|
c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
|
|
|
+
|
|
|
if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
|
|
|
subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
Node: commonConfig.RosNode,
|
|
@@ -88,6 +93,21 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
Node: commonConfig.RosNode,
|
|
|
Topic: topic,
|
|
|
Callback: func(data *kinglong_msgs.PerceptionObjects) {
|
|
|
+ // 更新共享变量
|
|
|
+ mutexOfTpperception.RLock()
|
|
|
+ {
|
|
|
+ for _, obj := range data.Objs {
|
|
|
+ if (obj.Type != 1 && obj.Type != 0) || obj.X <= 5 || math.Abs(float64(obj.Y)) >= 10 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if _, ok := extendParam.ObjDicOfTpperception[obj.Id]; !ok {
|
|
|
+ extendParam.ObjDicOfTpperception[obj.Id] = []float32{}
|
|
|
+ }
|
|
|
+ extendParam.ObjDicOfTpperception[obj.Id] = append(extendParam.ObjDicOfTpperception[obj.Id], obj.Y)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mutexOfTpperception.RUnlock()
|
|
|
+
|
|
|
subscribersTimeMutexes[i].Lock()
|
|
|
if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
subscribersMutexes[i].Lock()
|
|
@@ -108,11 +128,24 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+ // 5
|
|
|
if topic == masterConfig.TopicOfDataRead {
|
|
|
subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
Node: commonConfig.RosNode,
|
|
|
Topic: topic,
|
|
|
Callback: func(data *kinglong_msgs.Retrieval) {
|
|
|
+ // 更新共享变量
|
|
|
+ mutexOfDataRead.RLock()
|
|
|
+ {
|
|
|
+ extendParam.NumCountDataReadOfDataRead++
|
|
|
+ if extendParam.NumCountDataReadOfDataRead == 10 {
|
|
|
+ extendParam.EgoSteeringRealOfDataRead = append(extendParam.EgoSteeringRealOfDataRead, data.StrgAngleRealValue)
|
|
|
+ extendParam.EgoThrottleRealOfDataRead = append(extendParam.EgoThrottleRealOfDataRead, data.VcuAccelPosValue)
|
|
|
+ extendParam.EgoBrakeRealOfDataRead = append(extendParam.EgoBrakeRealOfDataRead, data.VcuBrkPelPosValue)
|
|
|
+ extendParam.NumCountDataReadOfDataRead = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mutexOfDataRead.RUnlock()
|
|
|
subscribersTimeMutexes[i].Lock()
|
|
|
if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
subscribersMutexes[i].Lock()
|
|
@@ -134,6 +167,123 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+ // 6
|
|
|
+ if topic == masterConfig.TopicOfJinlongControlPub && len(masterConfig.RuleOfJinlongControlPub) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *kinglong_msgs.JinlongControlCommand) {
|
|
|
+ // 更新共享变量
|
|
|
+ mutexOfJinlongControlPub.RLock()
|
|
|
+ {
|
|
|
+ extendParam.NumCountJinlongControlCommandOfPjControlPub++
|
|
|
+ if extendParam.NumCountJinlongControlCommandOfPjControlPub == 10 {
|
|
|
+ extendParam.EgoSteeringCmdOfJinlongControlPub = append(extendParam.EgoSteeringCmdOfJinlongControlPub, data.ASStrgAngleReq)
|
|
|
+ extendParam.EgoThrottleCmdOfJinlongControlPub = append(extendParam.EgoThrottleCmdOfJinlongControlPub, data.ASAutoDAccelPosReq)
|
|
|
+ extendParam.EgoBrakeCmdOfJinlongControlPub = append(extendParam.EgoBrakeCmdOfJinlongControlPub, data.ASAutoDBrkPelPosReq)
|
|
|
+ extendParam.NumCountJinlongControlCommandOfPjControlPub = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mutexOfJinlongControlPub.RUnlock()
|
|
|
+
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfJinlongControlPub {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ // 7
|
|
|
+ if topic == masterConfig.TopicOfFailureLidar && len(masterConfig.RuleOfFailureLidar) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *std_msgs.Bool) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfFailureLidar {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ // 8
|
|
|
+ if topic == masterConfig.TopicOfFailureRadar && len(masterConfig.RuleOfFailureRadar) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *std_msgs.Bool) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfFailureRadar {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ // 9
|
|
|
+ if topic == masterConfig.TopicOfFailureCamera && len(masterConfig.RuleOfFailureLidar) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *std_msgs.Bool) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfFailureLidar {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
if err != nil {
|
|
|
c_log.GlobalLogger.Info("创建订阅者报错:", err)
|
|
|
//TODO 如何回传日志
|
|
@@ -141,180 +291,6 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- ////创建订阅者0订阅主题 nodefault_info
|
|
|
- //c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfNodeFaultInfo)
|
|
|
- //subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- // Node: commonConfig.RosNode,
|
|
|
- // Topic: masterConfig.TopicOfNodeFaultInfo,
|
|
|
- // Callback: func(data *kinglong_msgs.FaultInfo) {
|
|
|
- // if len(masterConfig.RuleOfNodefaultInfo) == 0 {
|
|
|
- // //c_log.GlobalLogger.Info("话题 nodefault_info没有触发器")
|
|
|
- // return
|
|
|
- // }
|
|
|
- // commonEntity.Subscriber0TimeMutex.Lock()
|
|
|
- // if time.Since(commonEntity.Subscriber0Time).Seconds() > 1 {
|
|
|
- // commonEntity.Subscriber0TimeMutex.Unlock()
|
|
|
- // subscriber0Mutex.Lock()
|
|
|
- // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
- // var faultLabel string
|
|
|
- // for _, f := range masterConfig.RuleOfNodefaultInfo {
|
|
|
- // faultLabel = f(data)
|
|
|
- // if faultLabel != "" {
|
|
|
- // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- // commonEntity.Subscriber0Time = time.Now()
|
|
|
- // break
|
|
|
- // }
|
|
|
- // }
|
|
|
- // subscriber0Mutex.Unlock()
|
|
|
- // }
|
|
|
- // commonEntity.Subscriber0TimeMutex.Unlock()
|
|
|
- // }})
|
|
|
- //if err != nil {
|
|
|
- // c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
|
|
|
- // os.Exit(-1)
|
|
|
- //}
|
|
|
- //// 创建订阅者1订阅主题 cicv_location
|
|
|
- //c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfCicvLocation)
|
|
|
- //subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- // Node: commonConfig.RosNode,
|
|
|
- // Topic: masterConfig.TopicOfCicvLocation,
|
|
|
- // Callback: func(data *kinglong_msgs.PerceptionLocalization) {
|
|
|
- // m.RLock()
|
|
|
- // velocityX = data.VelocityX
|
|
|
- // velocityY = data.VelocityY
|
|
|
- // yaw = data.Yaw
|
|
|
- // m.RUnlock()
|
|
|
- //
|
|
|
- // if len(masterConfig.RuleOfCicvLocation) == 0 {
|
|
|
- // c_log.GlobalLogger.Info("话题 cicv_location 没有触发器")
|
|
|
- // return
|
|
|
- // }
|
|
|
- // commonEntity.Subscriber1TimeMutex.Lock()
|
|
|
- // if time.Since(commonEntity.Subscriber1Time).Seconds() > 1 {
|
|
|
- // subscriber1Mutex.Lock()
|
|
|
- // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
- // // 更新共享变量
|
|
|
- // var faultLabel string
|
|
|
- // for _, f := range masterConfig.RuleOfCicvLocation {
|
|
|
- // faultLabel = f(data)
|
|
|
- // if faultLabel != "" {
|
|
|
- // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- // commonEntity.Subscriber1Time = time.Now()
|
|
|
- // break
|
|
|
- // }
|
|
|
- // }
|
|
|
- // subscriber1Mutex.Unlock()
|
|
|
- // }
|
|
|
- // commonEntity.Subscriber1TimeMutex.Unlock()
|
|
|
- // },
|
|
|
- //})
|
|
|
- //if err != nil {
|
|
|
- // c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
|
|
|
- // os.Exit(-1)
|
|
|
- //}
|
|
|
- //c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfTpperception)
|
|
|
- //subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- // Node: commonConfig.RosNode,
|
|
|
- // Topic: masterConfig.TopicOfTpperception,
|
|
|
- // Callback: func(data *kinglong_msgs.PerceptionObjects) {
|
|
|
- // if len(masterConfig.RuleOfTpperception) == 0 {
|
|
|
- // c_log.GlobalLogger.Info("话题 tpperception 没有触发器")
|
|
|
- // return
|
|
|
- // }
|
|
|
- // commonEntity.Subscriber2TimeMutex.Lock()
|
|
|
- // // 判断是否是连续故障码
|
|
|
- // if time.Since(commonEntity.Subscriber2Time).Seconds() > 1 {
|
|
|
- // // 2 不是连续故障码
|
|
|
- // subscriber2Mutex.Lock()
|
|
|
- // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
- // var faultLabel string
|
|
|
- // //c_log.GlobalLogger.Infof("TTC数据为:【velocityX】=%v,【velocityY】=%v", velocityX, velocityY)
|
|
|
- // for _, f := range masterConfig.RuleOfTpperception {
|
|
|
- // faultLabel = f(data, velocityX, velocityY, yaw)
|
|
|
- // if faultLabel != "" {
|
|
|
- // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- // commonEntity.Subscriber2Time = time.Now()
|
|
|
- // break
|
|
|
- // }
|
|
|
- // }
|
|
|
- // subscriber2Mutex.Unlock()
|
|
|
- // }
|
|
|
- // commonEntity.Subscriber2TimeMutex.Unlock()
|
|
|
- // }})
|
|
|
- //if err != nil {
|
|
|
- // c_log.GlobalLogger.Info("创建订阅者2发生故障:", err)
|
|
|
- // os.Exit(-1)
|
|
|
- //}
|
|
|
- //// 创建订阅者3订阅主题 fault_info
|
|
|
- //c_log.GlobalLogger.Info("创建订阅者3订阅话题 ", masterConfig.TopicOfFaultInfo)
|
|
|
- //subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- // Node: commonConfig.RosNode,
|
|
|
- // Topic: masterConfig.TopicOfFaultInfo,
|
|
|
- // Callback: func(data *kinglong_msgs.FaultVec) {
|
|
|
- // if len(masterConfig.RuleOfFaultInfo) == 0 {
|
|
|
- // c_log.GlobalLogger.Info("话题 fault_info 没有触发器")
|
|
|
- // return
|
|
|
- // }
|
|
|
- // commonEntity.Subscriber3TimeMutex.Lock()
|
|
|
- // if time.Since(commonEntity.Subscriber3Time).Seconds() > 1 {
|
|
|
- // // 2 不是连续故障码
|
|
|
- // subscriber3Mutex.Lock()
|
|
|
- // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
- // var faultLabel string
|
|
|
- // for _, f := range masterConfig.RuleOfFaultInfo {
|
|
|
- // faultLabel = f(data)
|
|
|
- // if faultLabel != "" {
|
|
|
- // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- // commonEntity.Subscriber3Time = time.Now()
|
|
|
- // break
|
|
|
- // }
|
|
|
- // }
|
|
|
- // subscriber3Mutex.Unlock()
|
|
|
- // }
|
|
|
- // commonEntity.Subscriber3TimeMutex.Unlock()
|
|
|
- // }})
|
|
|
- //if err != nil {
|
|
|
- // c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
|
|
|
- // os.Exit(-1)
|
|
|
- //}
|
|
|
- //// 创建订阅者4订阅主题 data_read
|
|
|
- //// TODO 高频率触发
|
|
|
- //c_log.GlobalLogger.Info("创建订阅者4订阅话题 ", masterConfig.TopicOfDataRead)
|
|
|
- //subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- // Node: commonConfig.RosNode,
|
|
|
- // Topic: masterConfig.TopicOfDataRead,
|
|
|
- // Callback: func(data *kinglong_msgs.Retrieval) {
|
|
|
- // if len(masterConfig.RuleOfDataRead) == 0 {
|
|
|
- // //c_log.GlobalLogger.Info("话题 data_read 没有触发器")
|
|
|
- // return
|
|
|
- // }
|
|
|
- // commonEntity.Subscriber4TimeMutex.Lock()
|
|
|
- // if time.Since(commonEntity.Subscriber4Time).Seconds() > 1 {
|
|
|
- // subscriber4Mutex.Lock()
|
|
|
- // faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
- // var faultLabel string
|
|
|
- // for _, f := range masterConfig.RuleOfDataRead {
|
|
|
- // faultLabel = f(data)
|
|
|
- // if faultLabel != "" {
|
|
|
- // saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- // commonEntity.Subscriber4Time = time.Now()
|
|
|
- // break
|
|
|
- // }
|
|
|
- // }
|
|
|
- // subscriber4Mutex.Unlock()
|
|
|
- // }
|
|
|
- // commonEntity.Subscriber4TimeMutex.Unlock()
|
|
|
- // },
|
|
|
- //})
|
|
|
- //if err != nil {
|
|
|
- // c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
|
|
|
- // os.Exit(-1)
|
|
|
- //}
|
|
|
select {
|
|
|
case signal := <-service.ChannelKillWindowProducer:
|
|
|
if signal == 1 {
|