|
@@ -8,8 +8,13 @@ import (
|
|
|
"cicv-data-closedloop/common/entity"
|
|
|
"cicv-data-closedloop/common/util"
|
|
|
commonUtil "cicv-data-closedloop/common/util"
|
|
|
+ "cicv-data-closedloop/pjibot_delivery_msgs"
|
|
|
"encoding/json"
|
|
|
"github.com/bluenviron/goroslib/v2"
|
|
|
+ "github.com/bluenviron/goroslib/v2/pkg/msgs/diagnostic_msgs"
|
|
|
+ "github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
|
|
|
+ "github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
|
|
|
+ "github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
|
|
|
"github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
|
|
|
"sync"
|
|
|
"time"
|
|
@@ -25,6 +30,100 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
|
|
|
for i, topic := range commonConfig.SubscribeTopics {
|
|
|
c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
|
|
|
+ // 1
|
|
|
+ if topic == masterConfig.TopicOfDiagnostics && len(masterConfig.RuleOfDiagnostics) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *diagnostic_msgs.DiagnosticArray) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfDiagnostics {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ // 2
|
|
|
+ if topic == masterConfig.TopicOfImu && len(masterConfig.RuleOfImu) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *sensor_msgs.Imu) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfImu {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ // 3
|
|
|
+ if topic == masterConfig.TopicOfLocateInfo && len(masterConfig.RuleOfLocateInfo) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *pjibot_delivery_msgs.LocateInfo) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfLocateInfo {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ // 4
|
|
|
if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
|
|
|
subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
Node: commonConfig.RosNode,
|
|
@@ -55,6 +154,162 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
},
|
|
|
})
|
|
|
}
|
|
|
+ // 5
|
|
|
+ if topic == masterConfig.TopicOfOdom && len(masterConfig.RuleOfOdom) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *nav_msgs.Odometry) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfOdom {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ // 6
|
|
|
+ if topic == masterConfig.TopicOfSysInfo && len(masterConfig.RuleOfSysInfo) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *pjibot_delivery_msgs.SysInfo) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfSysInfo {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ // 7
|
|
|
+ if topic == masterConfig.TopicOfRobotPose && len(masterConfig.RuleOfRobotPose) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *geometry_msgs.PoseStamped) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfRobotPose {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ // 8
|
|
|
+ if topic == masterConfig.TopicOfTaskFeedbackInfo && len(masterConfig.RuleOfTaskFeedbackInfo) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *pjibot_delivery_msgs.TaskFeedbackInfo) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfTaskFeedbackInfo {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ // 9
|
|
|
+ if topic == masterConfig.TopicOfWheelOdom && len(masterConfig.RuleOfWheelOdom) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *nav_msgs.Odometry) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfWheelOdom {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ if canCollect() {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
+ }
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
if err != nil {
|
|
|
c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
|
|
|
continue
|