Bläddra i källkod

Merge remote-tracking branch 'origin/master'

LingxinMeng 9 månader sedan
förälder
incheckning
0247afcf7e

+ 291 - 256
aarch64/pjibot_delivery/master/package/service/produce_window.go

@@ -29,290 +29,325 @@ func PrepareTimeWindowProducerQueue() {
 	subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
 	subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
 	for i, topic := range commonConfig.SubscribeTopics {
-		c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
-		// 1
-		if topic == masterConfig.TopicOfDiagnostics && len(masterConfig.RuleOfDiagnostics) > 0 {
-			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
-				Node:  commonConfig.RosNode,
-				Topic: topic,
-				Callback: func(data *diagnostic_msgs.DiagnosticArray) {
-					subscribersTimeMutexes[i].Lock()
-					if time.Since(subscribersTimes[i]).Seconds() > 1 {
-						subscribersMutexes[i].Lock()
-						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-						lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
-						var faultLabel string
-						for _, f := range masterConfig.RuleOfDiagnostics {
-							faultLabel = f(data)
-							if faultLabel != "" {
-								if canCollect() {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-									subscribersTimes[i] = time.Now()
-									break
-								} else {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+		for {
+			create := false // 判断是否创建成功,用于打印日志
+			// 1
+			if topic == masterConfig.TopicOfDiagnostics && len(masterConfig.RuleOfDiagnostics) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *diagnostic_msgs.DiagnosticArray) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfDiagnostics {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
 								}
 							}
+							subscribersMutexes[i].Unlock()
 						}
-						subscribersMutexes[i].Unlock()
-					}
-					subscribersTimeMutexes[i].Unlock()
-				},
-			})
-		}
-		// 2
-		if topic == masterConfig.TopicOfImu && len(masterConfig.RuleOfImu) > 0 {
-			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
-				Node:  commonConfig.RosNode,
-				Topic: topic,
-				Callback: func(data *sensor_msgs.Imu) {
-					subscribersTimeMutexes[i].Lock()
-					if time.Since(subscribersTimes[i]).Seconds() > 1 {
-						subscribersMutexes[i].Lock()
-						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-						lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
-						var faultLabel string
-						for _, f := range masterConfig.RuleOfImu {
-							faultLabel = f(data)
-							if faultLabel != "" {
-								if canCollect() {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-									subscribersTimes[i] = time.Now()
-									break
-								} else {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 2
+			if topic == masterConfig.TopicOfImu && len(masterConfig.RuleOfImu) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *sensor_msgs.Imu) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfImu {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
 								}
 							}
+							subscribersMutexes[i].Unlock()
 						}
-						subscribersMutexes[i].Unlock()
-					}
-					subscribersTimeMutexes[i].Unlock()
-				},
-			})
-		}
-		// 3
-		if topic == masterConfig.TopicOfLocateInfo && len(masterConfig.RuleOfLocateInfo) > 0 {
-			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
-				Node:  commonConfig.RosNode,
-				Topic: topic,
-				Callback: func(data *pjibot_delivery_msgs.LocateInfo) {
-					subscribersTimeMutexes[i].Lock()
-					if time.Since(subscribersTimes[i]).Seconds() > 1 {
-						subscribersMutexes[i].Lock()
-						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-						lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
-						var faultLabel string
-						for _, f := range masterConfig.RuleOfLocateInfo {
-							faultLabel = f(data)
-							if faultLabel != "" {
-								if canCollect() {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-									subscribersTimes[i] = time.Now()
-									break
-								} else {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 3
+			if topic == masterConfig.TopicOfLocateInfo && len(masterConfig.RuleOfLocateInfo) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *pjibot_delivery_msgs.LocateInfo) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfLocateInfo {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
 								}
 							}
+							subscribersMutexes[i].Unlock()
 						}
-						subscribersMutexes[i].Unlock()
-					}
-					subscribersTimeMutexes[i].Unlock()
-				},
-			})
-		}
-		// 4
-		if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
-			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
-				Node:  commonConfig.RosNode,
-				Topic: topic,
-				Callback: func(data *std_msgs.UInt8) {
-					subscribersTimeMutexes[i].Lock()
-					if time.Since(subscribersTimes[i]).Seconds() > 1 {
-						subscribersMutexes[i].Lock()
-						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-						lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
-						var faultLabel string
-						for _, f := range masterConfig.RuleOfObstacleDetection {
-							faultLabel = f(data)
-							if faultLabel != "" {
-								if canCollect() {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-									subscribersTimes[i] = time.Now()
-									break
-								} else {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 4
+			if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *std_msgs.UInt8) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfObstacleDetection {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
 								}
 							}
+							subscribersMutexes[i].Unlock()
 						}
-						subscribersMutexes[i].Unlock()
-					}
-					subscribersTimeMutexes[i].Unlock()
-				},
-			})
-		}
-		// 5
-		if topic == masterConfig.TopicOfOdom && len(masterConfig.RuleOfOdom) > 0 {
-			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
-				Node:  commonConfig.RosNode,
-				Topic: topic,
-				Callback: func(data *nav_msgs.Odometry) {
-					subscribersTimeMutexes[i].Lock()
-					if time.Since(subscribersTimes[i]).Seconds() > 1 {
-						subscribersMutexes[i].Lock()
-						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-						lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
-						var faultLabel string
-						for _, f := range masterConfig.RuleOfOdom {
-							faultLabel = f(data)
-							if faultLabel != "" {
-								if canCollect() {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-									subscribersTimes[i] = time.Now()
-									break
-								} else {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 5
+			if topic == masterConfig.TopicOfOdom && len(masterConfig.RuleOfOdom) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *nav_msgs.Odometry) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfOdom {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
 								}
 							}
+							subscribersMutexes[i].Unlock()
 						}
-						subscribersMutexes[i].Unlock()
-					}
-					subscribersTimeMutexes[i].Unlock()
-				},
-			})
-		}
-		// 6
-		if topic == masterConfig.TopicOfSysInfo && len(masterConfig.RuleOfSysInfo) > 0 {
-			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
-				Node:  commonConfig.RosNode,
-				Topic: topic,
-				Callback: func(data *pjibot_delivery_msgs.SysInfo) {
-					subscribersTimeMutexes[i].Lock()
-					if time.Since(subscribersTimes[i]).Seconds() > 1 {
-						subscribersMutexes[i].Lock()
-						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-						lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
-						var faultLabel string
-						for _, f := range masterConfig.RuleOfSysInfo {
-							faultLabel = f(data)
-							if faultLabel != "" {
-								if canCollect() {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-									subscribersTimes[i] = time.Now()
-									break
-								} else {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 6
+			if topic == masterConfig.TopicOfSysInfo && len(masterConfig.RuleOfSysInfo) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *pjibot_delivery_msgs.SysInfo) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfSysInfo {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
 								}
 							}
+							subscribersMutexes[i].Unlock()
 						}
-						subscribersMutexes[i].Unlock()
-					}
-					subscribersTimeMutexes[i].Unlock()
-				},
-			})
-		}
-		// 7
-		if topic == masterConfig.TopicOfRobotPose && len(masterConfig.RuleOfRobotPose) > 0 {
-			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
-				Node:  commonConfig.RosNode,
-				Topic: topic,
-				Callback: func(data *geometry_msgs.PoseStamped) {
-					subscribersTimeMutexes[i].Lock()
-					if time.Since(subscribersTimes[i]).Seconds() > 1 {
-						subscribersMutexes[i].Lock()
-						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-						lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
-						var faultLabel string
-						for _, f := range masterConfig.RuleOfRobotPose {
-							faultLabel = f(data)
-							if faultLabel != "" {
-								if canCollect() {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-									subscribersTimes[i] = time.Now()
-									break
-								} else {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 7
+			if topic == masterConfig.TopicOfRobotPose && len(masterConfig.RuleOfRobotPose) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *geometry_msgs.PoseStamped) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfRobotPose {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
 								}
 							}
+							subscribersMutexes[i].Unlock()
 						}
-						subscribersMutexes[i].Unlock()
-					}
-					subscribersTimeMutexes[i].Unlock()
-				},
-			})
-		}
-		// 8
-		if topic == masterConfig.TopicOfTaskFeedbackInfo && len(masterConfig.RuleOfTaskFeedbackInfo) > 0 {
-			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
-				Node:  commonConfig.RosNode,
-				Topic: topic,
-				Callback: func(data *pjibot_delivery_msgs.TaskFeedbackInfo) {
-					subscribersTimeMutexes[i].Lock()
-					if time.Since(subscribersTimes[i]).Seconds() > 1 {
-						subscribersMutexes[i].Lock()
-						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-						lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
-						var faultLabel string
-						for _, f := range masterConfig.RuleOfTaskFeedbackInfo {
-							faultLabel = f(data)
-							if faultLabel != "" {
-								if canCollect() {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-									subscribersTimes[i] = time.Now()
-									break
-								} else {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 8
+			if topic == masterConfig.TopicOfTaskFeedbackInfo && len(masterConfig.RuleOfTaskFeedbackInfo) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *pjibot_delivery_msgs.TaskFeedbackInfo) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfTaskFeedbackInfo {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
 								}
 							}
+							subscribersMutexes[i].Unlock()
 						}
-						subscribersMutexes[i].Unlock()
-					}
-					subscribersTimeMutexes[i].Unlock()
-				},
-			})
-		}
-		// 9
-		if topic == masterConfig.TopicOfWheelOdom && len(masterConfig.RuleOfWheelOdom) > 0 {
-			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
-				Node:  commonConfig.RosNode,
-				Topic: topic,
-				Callback: func(data *nav_msgs.Odometry) {
-					subscribersTimeMutexes[i].Lock()
-					if time.Since(subscribersTimes[i]).Seconds() > 1 {
-						subscribersMutexes[i].Lock()
-						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-						lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
-						var faultLabel string
-						for _, f := range masterConfig.RuleOfWheelOdom {
-							faultLabel = f(data)
-							if faultLabel != "" {
-								if canCollect() {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-									subscribersTimes[i] = time.Now()
-									break
-								} else {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 9
+			if topic == masterConfig.TopicOfWheelOdom && len(masterConfig.RuleOfWheelOdom) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *nav_msgs.Odometry) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfWheelOdom {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
 								}
 							}
+							subscribersMutexes[i].Unlock()
 						}
-						subscribersMutexes[i].Unlock()
-					}
-					subscribersTimeMutexes[i].Unlock()
-				},
-			})
-		}
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
 
-		if err != nil {
-			c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
-			continue
+			if err != nil {
+				c_log.GlobalLogger.Infof("创建订阅者报错,可能由于节点未启动,再次尝试【%v】", err)
+				time.Sleep(time.Duration(2) * time.Second)
+				continue
+			} else {
+				if create {
+					c_log.GlobalLogger.Infof("创建订阅者订阅话题【%v】", topic)
+				}
+				break
+			}
 		}
 	}
 

+ 2 - 1
aarch64/pjibot_guide/common/service/disk_clean.go

@@ -79,7 +79,8 @@ func getIndexToRemoveForLRU() int {
 	for i >= 0 {
 		for i2, window := range entity.TimeWindowConsumerQueue {
 			for _, label := range window.Labels {
-				if masterConfig.LabelMapTriggerId[label] == lru[i] {
+				value, _ := masterConfig.LabelMapTriggerId.Load(label)
+				if value == lru[i] {
 					return i2
 				}
 			}

+ 3 - 1
aarch64/pjibot_guide/common/service/rosbag_upload.go

@@ -124,7 +124,9 @@ outLoop:
 		// 在上传完成的包目录同级下添加一个目录同名的json
 		triggerIds := make([]string, 0)
 		for _, label := range currentTimeWindow.Labels {
-			triggerIds = append(triggerIds, masterConfig.LabelMapTriggerId[label])
+			if value, ok := masterConfig.LabelMapTriggerId.Load(label); ok {
+				triggerIds = append(triggerIds, value.(string))
+			}
 		}
 		callBackMap := map[string]interface{}{
 			"dataName":    currentTimeWindow.FaultTime, // 云端callback程序会将该值加8小时,因为UTC和CSV时区相差8小时

+ 0 - 140
aarch64/pjibot_guide/master/package/config/master_trigger_cfg.go

@@ -1,140 +0,0 @@
-package config
-
-import (
-	"cicv-data-closedloop/aarch64/pjibot_guide/common/config"
-	"cicv-data-closedloop/common/config/c_log"
-	"cicv-data-closedloop/common/util"
-	pji_msgs "cicv-data-closedloop/pjibot_guide_msgs"
-	"github.com/bluenviron/goroslib/v2/pkg/msgs/diagnostic_msgs"
-	"github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
-	"github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
-	"github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
-	"plugin"
-	"strconv"
-)
-
-var (
-	LabelMapTriggerId = make(map[string]string)
-
-	// 1
-	TopicOfDiagnostics = "/diagnostics"
-	RuleOfDiagnostics  []func(data *diagnostic_msgs.DiagnosticArray) string
-	// 2
-	TopicOfImu = "/imu"
-	RuleOfImu  []func(data *sensor_msgs.Imu) string
-	// 3
-	TopicOfLocateInfo = "/locate_info"
-	RuleOfLocateInfo  []func(data *pji_msgs.LocateInfo) string
-	// 4
-	TopicOfObstacleDetection = "/obstacle_detection"
-	RuleOfObstacleDetection  []func(data *std_msgs.UInt8) string
-	// 5
-	TopicOfOdom = "/odom"
-	RuleOfOdom  []func(data *nav_msgs.Odometry) string
-	// 6
-	TopicOfSysInfo = "/sys_info"
-	RuleOfSysInfo  []func(data *pji_msgs.SysInfo) string
-)
-
-func InitTriggerConfig() {
-	loadSuccess := 0
-	// 下载所有触发器的文件
-	for _, trigger := range config.PlatformConfig.TaskTriggers {
-		// 下载
-		triggerLocalPath := config.CloudConfig.TriggersDir + trigger.TriggerScriptPath
-		_ = util.CreateParentDir(triggerLocalPath)
-		c_log.GlobalLogger.Info("下载触发器插件从", trigger.TriggerScriptPath, "到", triggerLocalPath)
-		config.OssMutex.Lock()
-		err := config.OssBucket.GetObjectToFile(trigger.TriggerScriptPath, triggerLocalPath)
-		config.OssMutex.Unlock()
-		if err != nil {
-			c_log.GlobalLogger.Errorf("下载oss上的触发器插件失败【%v】->【%v】:%v", trigger.TriggerScriptPath, triggerLocalPath, err)
-			continue
-		}
-		// 载入插件到数组
-		open, err := plugin.Open(triggerLocalPath)
-		if err != nil {
-			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "失败。", err)
-		}
-		topic0, err := open.Lookup("Topic")
-		if err != nil {
-			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Topic方法失败。", err)
-			continue
-		}
-		topic1, ok := topic0.(func() string)
-		if ok != true {
-			c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func() string):", err)
-			continue
-		}
-		topic2 := topic1()
-		rule, err := open.Lookup("Rule")
-		if err != nil {
-			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Rule方法失败。", err)
-			continue
-		}
-		// 判断topic
-		// todo 如果是未知的topic,可以添加一个循环,更新平台配置,同理金龙车和多功能车
-		if TopicOfDiagnostics == topic2 { // 1
-			f, ok := rule.(func(*diagnostic_msgs.DiagnosticArray) string)
-			if ok != true {
-				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *diagnostic_msgs.DiagnosticArray) string):", err)
-				continue
-			}
-			RuleOfDiagnostics = append(RuleOfDiagnostics, f)
-		} else if TopicOfImu == topic2 { // 2
-			f, ok := rule.(func(data *sensor_msgs.Imu) string)
-			if ok != true {
-				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *sensor_msgs.Imu) string):", err)
-				continue
-			}
-			RuleOfImu = append(RuleOfImu, f)
-		} else if TopicOfLocateInfo == topic2 { // 3
-			f, ok := rule.(func(data *pji_msgs.LocateInfo) string)
-			if ok != true {
-				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *pji_msgs.LocateInfo) string):", err)
-				continue
-			}
-			RuleOfLocateInfo = append(RuleOfLocateInfo, f)
-		} else if TopicOfObstacleDetection == topic2 { // 4
-			f, ok := rule.(func(data *std_msgs.UInt8) string)
-			if ok != true {
-				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *std_msgs.UInt8) string):", err)
-				continue
-			}
-			RuleOfObstacleDetection = append(RuleOfObstacleDetection, f)
-		} else if TopicOfOdom == topic2 { // 5
-			f, ok := rule.(func(data *nav_msgs.Odometry) string)
-			if ok != true {
-				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *nav_msgs.Odometry) string):", err)
-				continue
-			}
-			RuleOfOdom = append(RuleOfOdom, f)
-		} else if TopicOfSysInfo == topic2 { // 6
-			f, ok := rule.(func(data *pji_msgs.SysInfo) string)
-			if ok != true {
-				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *pji_msgs.SysInfo) string):", err)
-				continue
-			}
-			RuleOfSysInfo = append(RuleOfSysInfo, f)
-		} else {
-			c_log.GlobalLogger.Error("未知的topic:", topic2)
-			continue
-		}
-
-		label, err := open.Lookup("Label")
-		if err != nil {
-			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的TriggerName方法失败。", err)
-			continue
-		}
-		labelFunc, ok := label.(func() string)
-		if ok != true {
-			c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Label方法必须是(func() string):", err)
-			continue
-		}
-		labelString := labelFunc()
-		LabelMapTriggerId[labelString] = strconv.Itoa(trigger.TriggerId)
-		loadSuccess++
-		c_log.GlobalLogger.Info("主节点加载触发器插件:【ros topic】=", topic2, ",【触发器label】=", labelString, "【触发器ID】=", trigger.TriggerId)
-	}
-	c_log.GlobalLogger.Infof("一共有%v个触发器,加载成功了%v个,【label和id映射关系】=%v", len(config.PlatformConfig.TaskTriggers), loadSuccess, LabelMapTriggerId)
-}

+ 142 - 0
aarch64/pjibot_guide/master/package/config/trigger_init.go

@@ -0,0 +1,142 @@
+package config
+
+import (
+	"cicv-data-closedloop/aarch64/pjibot_guide/common/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
+	pji_msgs "cicv-data-closedloop/pjibot_guide_msgs"
+	"github.com/bluenviron/goroslib/v2/pkg/msgs/diagnostic_msgs"
+	"github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
+	"github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
+	"github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
+	"plugin"
+	"slices"
+)
+
+func InitTriggerConfig() {
+	config.OssMutex.Lock()
+	defer config.OssMutex.Unlock()
+	triggerLocalPathsMapTriggerId := make(map[string]string)
+	c_log.GlobalLogger.Info("主节点加载触发器插件 - 开始。")
+	// 1 获取数采任务的触发器列表
+	cloudTriggers := &config.PlatformConfig.TaskTriggers
+	// 2 获取本地触发器列表(触发器目录的一级子目录为【触发器ID_触发器Label】)
+	localTriggerIds := util.GetFirstLevelSubdirectories(config.CloudConfig.TriggersDir)
+	// 3 对比触发器列表,本地没有的则下载
+	for _, trigger := range *cloudTriggers {
+		id := util.ToString(trigger.TriggerId)
+		hasIdDir := slices.Contains(localTriggerIds, id) // 改成了 slices 工具库
+		triggerLocalDir := config.CloudConfig.TriggersDir + id + "/"
+		hasLabelSo, soPaths := util.CheckSoFilesInDirectory(triggerLocalDir)
+		var triggerLocalPath string
+		if hasIdDir && hasLabelSo { // 已存在的触发器需要判断是否大小一致
+			triggerLocalPath = soPaths[0]
+			ossSize, _ := util.GetOSSFileSize(config.OssBucket, trigger.TriggerScriptPath)
+			localSize, _ := util.GetFileSize(triggerLocalPath)
+			if ossSize == localSize {
+				c_log.GlobalLogger.Info("触发器插件 ", triggerLocalPath, " 存在且与云端触发器大小一致。")
+				triggerLocalPathsMapTriggerId[triggerLocalPath] = id
+				continue
+			}
+		}
+		label := util.GetFileNameWithoutExtension(config.CloudConfig.TriggersDir + trigger.TriggerScriptPath)
+		triggerLocalPath = config.CloudConfig.TriggersDir + id + "/" + label + ".so"
+		c_log.GlobalLogger.Info("开始下载触发器插件从 ", trigger.TriggerScriptPath, " 到 ", triggerLocalPath)
+		_ = util.CreateParentDir(triggerLocalPath)
+		for {
+			if err := config.OssBucket.GetObjectToFile(trigger.TriggerScriptPath, triggerLocalPath); err != nil {
+				c_log.GlobalLogger.Error("下载触发器插件失败,再次尝试:", err)
+				continue
+			}
+			break
+		}
+		triggerLocalPathsMapTriggerId[triggerLocalPath] = id
+	}
+
+	success := 0
+	// 加载所有触发器的文件
+	for triggerLocalPath, triggerId := range triggerLocalPathsMapTriggerId {
+		// 载入插件到数组
+		open, err := plugin.Open(triggerLocalPath)
+		if err != nil {
+			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "失败。", err)
+			continue
+		}
+		topic0, err := open.Lookup("Topic")
+		if err != nil {
+			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Topic方法失败。", err)
+			continue
+		}
+		topic1, ok := topic0.(func() string)
+		if ok != true {
+			c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func() string):", err)
+			continue
+		}
+		topic2 := topic1()
+		rule, err := open.Lookup("Rule")
+		if err != nil {
+			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Rule方法失败。", err)
+			continue
+		}
+		if TopicOfDiagnostics == topic2 { // 1
+			if f, ok1 := rule.(func(*diagnostic_msgs.DiagnosticArray) string); ok1 {
+				RuleOfDiagnostics = append(RuleOfDiagnostics, f)
+				goto JudgeDone
+			}
+			log(triggerLocalPath)
+			continue
+		} else if TopicOfImu == topic2 { // 2
+			if f, ok1 := rule.(func(data *sensor_msgs.Imu) string); ok1 {
+				RuleOfImu = append(RuleOfImu, f)
+				goto JudgeDone
+			}
+			log(triggerLocalPath)
+			continue
+		} else if TopicOfLocateInfo == topic2 { // 3
+			if f, ok1 := rule.(func(data *pji_msgs.LocateInfo) string); ok1 {
+				RuleOfLocateInfo = append(RuleOfLocateInfo, f)
+				goto JudgeDone
+			}
+			log(triggerLocalPath)
+			continue
+		} else if TopicOfObstacleDetection == topic2 { // 4
+			if f, ok1 := rule.(func(data *std_msgs.UInt8) string); ok1 {
+				RuleOfObstacleDetection = append(RuleOfObstacleDetection, f)
+				goto JudgeDone
+			}
+			log(triggerLocalPath)
+			continue
+		} else if TopicOfOdom == topic2 { // 5
+			if f, ok1 := rule.(func(data *nav_msgs.Odometry) string); ok1 {
+				RuleOfOdom = append(RuleOfOdom, f)
+				goto JudgeDone
+			}
+			log(triggerLocalPath)
+			continue
+		} else if TopicOfSysInfo == topic2 { // 6
+			if f, ok1 := rule.(func(data *pji_msgs.SysInfo) string); ok1 {
+				RuleOfSysInfo = append(RuleOfSysInfo, f)
+				goto JudgeDone
+			}
+			log(triggerLocalPath)
+			continue
+		} else {
+			c_log.GlobalLogger.Error("未知的topic:", topic2)
+			continue
+		}
+	JudgeDone:
+		label, err := open.Lookup("Label")
+		if err != nil {
+			c_log.GlobalLogger.Error("加载本地插件 ", triggerLocalPath, " 中的 Label 方法失败。", err)
+			continue
+		}
+		labelFunc := label.(func() string)
+		labelString := labelFunc()
+		LabelMapTriggerId.Store(labelString, triggerId)
+		success++
+	}
+	c_log.GlobalLogger.Info("一共应加载", len(config.PlatformConfig.TaskTriggers), "个触发器,实际加载 ", success, " 个触发器。")
+}
+func log(triggerLocalPath string) {
+	c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的 Rule 方法参数格式不正确。")
+}

+ 34 - 0
aarch64/pjibot_guide/master/package/config/trigger_var.go

@@ -0,0 +1,34 @@
+package config
+
+import (
+	pji_msgs "cicv-data-closedloop/pjibot_guide_msgs"
+	"github.com/bluenviron/goroslib/v2/pkg/msgs/diagnostic_msgs"
+	"github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
+	"github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
+	"github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
+	"sync"
+)
+
+var (
+	LabelMapTriggerId = new(sync.Map)
+
+	// 1
+	TopicOfDiagnostics = "/diagnostics"
+	RuleOfDiagnostics  []func(data *diagnostic_msgs.DiagnosticArray) string
+	// 2
+	TopicOfImu = "/imu"
+	RuleOfImu  []func(data *sensor_msgs.Imu) string
+	// 3
+	TopicOfLocateInfo = "/locate_info"
+	RuleOfLocateInfo  []func(data *pji_msgs.LocateInfo) string
+	// 4
+	TopicOfObstacleDetection = "/obstacle_detection"
+	RuleOfObstacleDetection  []func(data *std_msgs.UInt8) string
+	// 5
+	TopicOfOdom = "/odom"
+	RuleOfOdom  []func(data *nav_msgs.Odometry) string
+	// 6
+	TopicOfSysInfo = "/sys_info"
+	RuleOfSysInfo  []func(data *pji_msgs.SysInfo) string
+	// todo 这里加新的topic也需要在produce_window.go加新的订阅者
+)

+ 39 - 29
aarch64/pjibot_guide/master/package/service/produce_window.go

@@ -24,38 +24,48 @@ func PrepareTimeWindowProducerQueue() {
 	subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
 	subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
 	for i, topic := range commonConfig.SubscribeTopics {
-		c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
-		if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
-			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
-				Node:  commonConfig.RosNode,
-				Topic: topic,
-				Callback: func(data *std_msgs.UInt8) {
-					subscribersTimeMutexes[i].Lock()
-					if time.Since(subscribersTimes[i]).Seconds() > 1 {
-						subscribersMutexes[i].Lock()
-						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-						lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
-						var faultLabel string
-						for _, f := range masterConfig.RuleOfObstacleDetection {
-							faultLabel = f(data)
-							if faultLabel != "" {
-								if canCollect() {
-									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-									subscribersTimes[i] = time.Now()
-									break
+		for {
+			create := false // 判断是否创建成功,用于打印日志
+			if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *std_msgs.UInt8) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfObstacleDetection {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									}
 								}
 							}
+							subscribersMutexes[i].Unlock()
 						}
-						subscribersMutexes[i].Unlock()
-					}
-					subscribersTimeMutexes[i].Unlock()
-				},
-			})
-		}
-		if err != nil {
-			c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
-			//TODO 如何回传日志
-			continue
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			if err != nil {
+				c_log.GlobalLogger.Infof("创建订阅者报错,可能由于节点未启动,再次尝试【%v】", err)
+				time.Sleep(time.Duration(2) * time.Second)
+				continue
+			} else {
+				if create {
+					c_log.GlobalLogger.Infof("创建订阅者订阅话题【%v】", topic)
+				}
+				break
+			}
 		}
 	}
 

+ 321 - 32
aarch64/pjibot_patrol/master/package/service/produce_window.go

@@ -8,8 +8,13 @@ import (
 	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
 	commonUtil "cicv-data-closedloop/common/util"
+	"cicv-data-closedloop/pjibot_patrol_msgs"
 	"encoding/json"
 	"github.com/bluenviron/goroslib/v2"
+	"github.com/bluenviron/goroslib/v2/pkg/msgs/diagnostic_msgs"
+	"github.com/bluenviron/goroslib/v2/pkg/msgs/geometry_msgs"
+	"github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
+	"github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
 	"github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
 	"sync"
 	"time"
@@ -24,41 +29,325 @@ func PrepareTimeWindowProducerQueue() {
 	subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
 	subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
 	for i, topic := range commonConfig.SubscribeTopics {
-		c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
-		if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
-			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
-				Node:  commonConfig.RosNode,
-				Topic: topic,
-				Callback: func(data *std_msgs.UInt8) {
-					subscribersTimeMutexes[i].Lock()
-					if time.Since(subscribersTimes[i]).Seconds() > 1 {
-						subscribersMutexes[i].Lock()
-						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-						lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
-						var faultLabel string
-						for _, f := range masterConfig.RuleOfObstacleDetection {
-							faultLabel = f(data)
-							if faultLabel != "" {
-								if canCollect() {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-									subscribersTimes[i] = time.Now()
-									break
-								} else {
-									c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+		for {
+			create := false // 判断是否创建成功,用于打印日志
+			// 1
+			if topic == masterConfig.TopicOfDiagnostics && len(masterConfig.RuleOfDiagnostics) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *diagnostic_msgs.DiagnosticArray) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfDiagnostics {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
 								}
 							}
+							subscribersMutexes[i].Unlock()
 						}
-						subscribersMutexes[i].Unlock()
-					}
-					subscribersTimeMutexes[i].Unlock()
-				},
-			})
-		}
-		if err != nil {
-			c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
-			//TODO 如何回传日志
-			continue
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 2
+			if topic == masterConfig.TopicOfImu && len(masterConfig.RuleOfImu) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *sensor_msgs.Imu) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfImu {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
+								}
+							}
+							subscribersMutexes[i].Unlock()
+						}
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 3
+			if topic == masterConfig.TopicOfLocateInfo && len(masterConfig.RuleOfLocateInfo) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *pjibot_patrol_msgs.LocateInfo) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfLocateInfo {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
+								}
+							}
+							subscribersMutexes[i].Unlock()
+						}
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 4
+			if topic == masterConfig.TopicOfObstacleDetection && len(masterConfig.RuleOfObstacleDetection) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *std_msgs.UInt8) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfObstacleDetection {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
+								}
+							}
+							subscribersMutexes[i].Unlock()
+						}
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 5
+			if topic == masterConfig.TopicOfOdom && len(masterConfig.RuleOfOdom) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *nav_msgs.Odometry) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfOdom {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
+								}
+							}
+							subscribersMutexes[i].Unlock()
+						}
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 6
+			if topic == masterConfig.TopicOfSysInfo && len(masterConfig.RuleOfSysInfo) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *pjibot_patrol_msgs.SysInfo) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfSysInfo {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
+								}
+							}
+							subscribersMutexes[i].Unlock()
+						}
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 7
+			if topic == masterConfig.TopicOfRobotPose && len(masterConfig.RuleOfRobotPose) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *geometry_msgs.PoseStamped) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfRobotPose {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
+								}
+							}
+							subscribersMutexes[i].Unlock()
+						}
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 8
+			if topic == masterConfig.TopicOfTaskFeedbackInfo && len(masterConfig.RuleOfTaskFeedbackInfo) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *pjibot_patrol_msgs.TaskFeedbackInfo) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfTaskFeedbackInfo {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
+								}
+							}
+							subscribersMutexes[i].Unlock()
+						}
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+			// 9
+			if topic == masterConfig.TopicOfWheelOdom && len(masterConfig.RuleOfWheelOdom) > 0 {
+				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+					Node:  commonConfig.RosNode,
+					Topic: topic,
+					Callback: func(data *nav_msgs.Odometry) {
+						subscribersTimeMutexes[i].Lock()
+						if time.Since(subscribersTimes[i]).Seconds() > 1 {
+							subscribersMutexes[i].Lock()
+							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
+							var faultLabel string
+							for _, f := range masterConfig.RuleOfWheelOdom {
+								faultLabel = f(data)
+								if faultLabel != "" {
+									if canCollect() {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+										subscribersTimes[i] = time.Now()
+										break
+									} else {
+										c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
+									}
+								}
+							}
+							subscribersMutexes[i].Unlock()
+						}
+						subscribersTimeMutexes[i].Unlock()
+					},
+				})
+				if err == nil {
+					create = true
+				}
+			}
+
+			if err != nil {
+				c_log.GlobalLogger.Infof("创建订阅者报错,可能由于节点未启动,再次尝试【%v】", err)
+				time.Sleep(time.Duration(2) * time.Second)
+				continue
+			} else {
+				if create {
+					c_log.GlobalLogger.Infof("创建订阅者订阅话题【%v】", topic)
+				}
+				break
+			}
 		}
 	}
 

+ 122 - 5
aarch64/pjisuv/master/service/produce_window.go

@@ -87,10 +87,7 @@ func ProduceWindow() {
 					}
 				}()
 			}
-
-			// 其他常规监听器
-			c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
-
+			create := false // 判断是否创建成功,用于打印日志
 			// 1
 			if topic == masterConfig.TopicOfAmrPose && (len(masterConfig.RuleOfAmrPose1) > 0 || len(masterConfig.RuleOfAmrPose3) > 0) {
 				subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
@@ -129,6 +126,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 2
@@ -171,6 +171,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 3
@@ -214,6 +217,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 4
@@ -257,6 +263,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 5
@@ -300,6 +309,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 6
@@ -343,6 +355,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 7
@@ -386,6 +401,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 8
@@ -429,6 +447,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 9
@@ -471,6 +492,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 10
@@ -514,6 +538,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 11 有共享变量的订阅者必须被创建
@@ -561,6 +588,9 @@ func ProduceWindow() {
 						shareVars.Store("DecisionType", data.Trajectoryinfo.DecisionType)
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 12 有共享变量的订阅者必须被创建
@@ -624,6 +654,9 @@ func ProduceWindow() {
 						}
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 13
@@ -668,6 +701,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 14
@@ -711,6 +747,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 15
@@ -755,6 +794,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 16
@@ -799,6 +841,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 17
@@ -842,6 +887,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 18
@@ -885,6 +933,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 19
@@ -928,6 +979,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 20
@@ -972,6 +1026,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 21
@@ -1015,6 +1072,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 22 有共享变量的订阅者必须被创建
@@ -1067,6 +1127,9 @@ func ProduceWindow() {
 						shareVars.Store("EgoThrottleCmdOfPjControlPub", egoThrottleCmdOfPjControlPub)
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 23
@@ -1110,6 +1173,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 24
@@ -1153,6 +1219,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 25
@@ -1196,6 +1265,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 26
@@ -1239,6 +1311,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 27
@@ -1282,6 +1357,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 28
@@ -1325,6 +1403,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 29
@@ -1368,6 +1449,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 30 有共享变量的订阅者必须被创建
@@ -1430,6 +1514,9 @@ func ProduceWindow() {
 						shareVars.Store("ObjSpeedDicOfTpperception", objSpeedDicOfTpperception)
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 31
@@ -1473,6 +1560,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 32
@@ -1516,6 +1606,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 33
@@ -1559,6 +1652,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 34
@@ -1602,6 +1698,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 35
@@ -1646,6 +1745,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 36 有共享变量的订阅者必须被创建
@@ -1698,6 +1800,9 @@ func ProduceWindow() {
 						shareVars.Store("ActStrWhAngOfDataRead", data.ActStrWhAng)
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 			// 37
 			if topic == masterConfig.TopicOfPjiGps &&
@@ -1740,6 +1845,9 @@ func ProduceWindow() {
 						subscribersTimeMutexes[i].Unlock()
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 
 			// 39 有共享变量的订阅者必须被创建
@@ -1783,6 +1891,9 @@ func ProduceWindow() {
 						shareVars.Store("AutomodeOfPjVehicleFdbPub", data.Automode)
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 			// 40 有共享变量的订阅者必须被创建
 			if topic == masterConfig.TopicOfEndPointMessage {
@@ -1827,12 +1938,18 @@ func ProduceWindow() {
 						shareVars.Store("EndPointY", data.Y)
 					},
 				})
+				if err == nil {
+					create = true
+				}
 			}
 			if err != nil {
-				c_log.GlobalLogger.Info("创建订阅者报错,可能由于节点未启动,再次尝试:", err)
+				c_log.GlobalLogger.Infof("创建订阅者报错,可能由于节点未启动,再次尝试【%v】", err)
 				time.Sleep(time.Duration(2) * time.Second)
 				continue
 			} else {
+				if create {
+					c_log.GlobalLogger.Infof("创建订阅者订阅话题【%v】", topic)
+				}
 				break
 			}
 		}