|
@@ -8,57 +8,351 @@ import (
|
|
|
"cicv-data-closedloop/common/entity"
|
|
|
"cicv-data-closedloop/common/util"
|
|
|
commonUtil "cicv-data-closedloop/common/util"
|
|
|
+ "cicv-data-closedloop/pjibot_delivery_msgs"
|
|
|
"encoding/json"
|
|
|
"github.com/bluenviron/goroslib/v2"
|
|
|
+ "github.com/bluenviron/goroslib/v2/pkg/msgs/diagnostic_msgs"
|
|
|
+ "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
|
|
|
+ "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
|
|
|
+ "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
|
|
|
"github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
|
|
|
"sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
+var triggerInterval = 3.0 // 每个触发器3秒触发一次
|
|
|
+
|
|
|
// PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
|
|
|
func PrepareTimeWindowProducerQueue() {
|
|
|
|
|
|
var err error
|
|
|
- subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
|
|
|
- subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
|
|
|
- subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
|
|
|
- subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
|
|
|
- for i, topic := range commonConfig.SubscribeTopics {
|
|
|
- c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
|
|
|
- if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
|
|
|
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- Node: commonConfig.RosNode,
|
|
|
- Topic: topic,
|
|
|
- Callback: func(data *std_msgs.UInt8) {
|
|
|
- subscribersTimeMutexes[i].Lock()
|
|
|
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
- subscribersMutexes[i].Lock()
|
|
|
- faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
- var faultLabel string
|
|
|
- for _, f := range masterConfig.RuleOfObstacleDetection {
|
|
|
- faultLabel = f(data)
|
|
|
- if faultLabel != "" {
|
|
|
- if canCollect() {
|
|
|
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ subscribers := make([]*goroslib.Subscriber, masterConfig.AllTopicsNumber)
|
|
|
+ subscribersTimes := make([]time.Time, masterConfig.AllTopicsNumber)
|
|
|
+ subscribersTimeMutexes := make([]sync.Mutex, masterConfig.AllTopicsNumber)
|
|
|
+ subscribersMutexes := make([]sync.Mutex, masterConfig.AllTopicsNumber)
|
|
|
+ for i, topic := range masterConfig.AllTopics {
|
|
|
+ for {
|
|
|
+ create := false // 判断是否创建成功,用于打印日志
|
|
|
+ // 1
|
|
|
+ if topic == masterConfig.TopicOfDiagnostics && len(masterConfig.RuleOfDiagnostics) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *diagnostic_msgs.DiagnosticArray) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfDiagnostics {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
subscribersTimes[i] = time.Now()
|
|
|
- break
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
}
|
|
|
- subscribersMutexes[i].Unlock()
|
|
|
- }
|
|
|
- subscribersTimeMutexes[i].Unlock()
|
|
|
- },
|
|
|
- })
|
|
|
- }
|
|
|
- if err != nil {
|
|
|
- c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
|
|
|
- //TODO 如何回传日志
|
|
|
- continue
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ create = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 2
|
|
|
+ if topic == masterConfig.TopicOfImu && len(masterConfig.RuleOfImu) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *sensor_msgs.Imu) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfImu {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ create = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 3
|
|
|
+ if topic == masterConfig.TopicOfLocateInfo && len(masterConfig.RuleOfLocateInfo) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *pjibot_delivery_msgs.LocateInfo) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfLocateInfo {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ create = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 4
|
|
|
+ if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *std_msgs.UInt8) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfObstacleDetection {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ create = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 5
|
|
|
+ if topic == masterConfig.TopicOfOdom && len(masterConfig.RuleOfOdom) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *nav_msgs.Odometry) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfOdom {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ create = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 6
|
|
|
+ if topic == masterConfig.TopicOfSysInfo && len(masterConfig.RuleOfSysInfo) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *pjibot_delivery_msgs.SysInfo) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfSysInfo {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ create = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 7
|
|
|
+ if topic == masterConfig.TopicOfRobotPose && len(masterConfig.RuleOfRobotPose) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *geometry_msgs.PoseStamped) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfRobotPose {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ create = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 8
|
|
|
+ if topic == masterConfig.TopicOfTaskFeedbackInfo && len(masterConfig.RuleOfTaskFeedbackInfo) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *pjibot_delivery_msgs.TaskFeedbackInfo) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfTaskFeedbackInfo {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ create = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 9
|
|
|
+ if topic == masterConfig.TopicOfWheelOdom && len(masterConfig.RuleOfWheelOdom) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *nav_msgs.Odometry) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfWheelOdom {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ create = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Infof("创建订阅者报错,可能由于节点未启动,再次尝试【%v】", err)
|
|
|
+ time.Sleep(time.Duration(2) * time.Second)
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ if create {
|
|
|
+ c_log.GlobalLogger.Infof("创建订阅者订阅话题【%v】", topic)
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+ c_log.GlobalLogger.Infof("全部订阅者创建完成。")
|
|
|
select {
|
|
|
case signal := <-commonService.ChannelKillSubscriber:
|
|
|
if signal == 1 {
|
|
@@ -148,9 +442,8 @@ func canCollect() bool {
|
|
|
return false
|
|
|
}
|
|
|
if resp.Code != 200 { // 不是200 代表不允许采集
|
|
|
- c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
|
|
|
+ c_log.GlobalLogger.Infof("当前周期内采集数量已超过限额。%+v", resp)
|
|
|
return false
|
|
|
}
|
|
|
- c_log.GlobalLogger.Info("允许采集。")
|
|
|
return true
|
|
|
}
|