Эх сурвалжийг харах

refactor: do not collect data during data processing

HeWang 6 сар өмнө
parent
commit
1eb52b5415

+ 58 - 90
aarch64/pjibot_delivery/master/package/service/produce_window.go

@@ -43,6 +43,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *diagnostic_msgs.DiagnosticArray) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -53,16 +56,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -80,6 +76,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *sensor_msgs.Imu) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -90,16 +89,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -119,6 +111,9 @@ func PrepareTimeWindowProducerQueue() {
 					Callback: func(data *pjibot_delivery_msgs.LocateInfo) {
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
+							if !canCollect() {
+								return
+							}
 							subscribersMutexes[i].Lock()
 							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
 							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
@@ -127,16 +122,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -154,6 +142,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *std_msgs.UInt8) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -164,16 +155,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -193,6 +177,9 @@ func PrepareTimeWindowProducerQueue() {
 					Callback: func(data *nav_msgs.Odometry) {
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
+							if !canCollect() {
+								return
+							}
 							subscribersMutexes[i].Lock()
 							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
 							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
@@ -201,16 +188,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -230,6 +210,9 @@ func PrepareTimeWindowProducerQueue() {
 					Callback: func(data *pjibot_delivery_msgs.SysInfo) {
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
+							if !canCollect() {
+								return
+							}
 							subscribersMutexes[i].Lock()
 							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
 							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
@@ -238,16 +221,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -265,6 +241,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *geometry_msgs.PoseStamped) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -275,16 +254,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -302,6 +274,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_delivery_msgs.TaskFeedbackInfo) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -312,16 +287,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -339,6 +307,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *nav_msgs.Odometry) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -349,16 +320,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -475,5 +439,9 @@ func canCollect() bool {
 	if resp.Code != 200 { // 不是200 代表不允许采集
 		return false
 	}
+	if len(entity.TimeWindowConsumerQueue) != 0 {
+		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+		return false
+	}
 	return true
 }

+ 4 - 0
aarch64/pjibot_guide/master/package/service/produce_window.go

@@ -169,6 +169,10 @@ func canCollect() bool {
 		c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
 		return false
 	}
+	if len(entity.TimeWindowConsumerQueue) != 0 {
+		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+		return false
+	}
 
 	c_log.GlobalLogger.Info("允许采集。")
 	return true

+ 58 - 90
aarch64/pjibot_patrol/master/package/service/produce_window.go

@@ -43,6 +43,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *diagnostic_msgs.DiagnosticArray) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -53,16 +56,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -80,6 +76,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *sensor_msgs.Imu) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -90,16 +89,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -117,6 +109,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_patrol_msgs.LocateInfo) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -127,16 +122,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -154,6 +142,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *std_msgs.UInt8) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -164,16 +155,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -191,6 +175,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *nav_msgs.Odometry) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -201,16 +188,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -228,6 +208,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_patrol_msgs.SysInfo) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -238,16 +221,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -265,6 +241,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *geometry_msgs.PoseStamped) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -275,16 +254,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -302,6 +274,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_patrol_msgs.TaskFeedbackInfo) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -312,16 +287,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -339,6 +307,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *nav_msgs.Odometry) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -349,16 +320,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -475,5 +439,9 @@ func canCollect() bool {
 	if resp.Code != 200 { // 不是200 代表不允许采集
 		return false
 	}
+	if len(entity.TimeWindowConsumerQueue) != 0 {
+		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+		return false
+	}
 	return true
 }