|
@@ -24,38 +24,48 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
|
|
|
subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
|
|
|
for i, topic := range commonConfig.SubscribeTopics {
|
|
|
- c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
|
|
|
- if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
|
|
|
- subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
- Node: commonConfig.RosNode,
|
|
|
- Topic: topic,
|
|
|
- Callback: func(data *std_msgs.UInt8) {
|
|
|
- subscribersTimeMutexes[i].Lock()
|
|
|
- if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
- subscribersMutexes[i].Lock()
|
|
|
- faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
- var faultLabel string
|
|
|
- for _, f := range masterConfig.RuleOfObstacleDetection {
|
|
|
- faultLabel = f(data)
|
|
|
- if faultLabel != "" {
|
|
|
- if canCollect() {
|
|
|
- saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- subscribersTimes[i] = time.Now()
|
|
|
- break
|
|
|
+ for {
|
|
|
+ create := false // 判断是否创建成功,用于打印日志
|
|
|
+ if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
|
|
|
+ subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
+ Node: commonConfig.RosNode,
|
|
|
+ Topic: topic,
|
|
|
+ Callback: func(data *std_msgs.UInt8) {
|
|
|
+ subscribersTimeMutexes[i].Lock()
|
|
|
+ if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
+ subscribersMutexes[i].Lock()
|
|
|
+ faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ var faultLabel string
|
|
|
+ for _, f := range masterConfig.RuleOfObstacleDetection {
|
|
|
+ faultLabel = f(data)
|
|
|
+ if faultLabel != "" {
|
|
|
+ if canCollect() {
|
|
|
+ saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
+ subscribersTimes[i] = time.Now()
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ subscribersMutexes[i].Unlock()
|
|
|
}
|
|
|
- subscribersMutexes[i].Unlock()
|
|
|
- }
|
|
|
- subscribersTimeMutexes[i].Unlock()
|
|
|
- },
|
|
|
- })
|
|
|
- }
|
|
|
- if err != nil {
|
|
|
- c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
|
|
|
- //TODO 如何回传日志
|
|
|
- continue
|
|
|
+ subscribersTimeMutexes[i].Unlock()
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if err == nil {
|
|
|
+ create = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Infof("创建订阅者报错,可能由于节点未启动,再次尝试【%v】", err)
|
|
|
+ time.Sleep(time.Duration(2) * time.Second)
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ if create {
|
|
|
+ c_log.GlobalLogger.Infof("创建订阅者订阅话题【%v】", topic)
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|