Просмотр исходного кода

feat: 配送及巡检机器人修改采集频率限制&根据目录所占空间大小做判断

HeWang 5 месяцев назад
Родитель
Сommit
26ac8f9684

+ 3 - 1
aarch64/pjibot_delivery/common/service/disk_clean.go

@@ -27,7 +27,9 @@ func DiskClean() {
 	for {
 		time.Sleep(1000 * time.Millisecond)
 		// 1 获取磁盘占用
-		diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name)
+		//diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name) // 获取整个磁盘空间
+
+		diskUsed, _ := util.GetDirectoryDiskUsed(commonConfig.CloudConfig.Disk.Path) // 获取指定目录空间
 		if diskUsed > commonConfig.CloudConfig.Disk.Used {
 			policy := commonConfig.PlatformConfig.TaskCachePolicy
 			c_log.GlobalLogger.Errorf("磁盘占用 %v 超过 %v,触发删除规则 %v", diskUsed, commonConfig.CloudConfig.Disk.Used, policyToDescription[policy])

+ 24 - 0
aarch64/pjibot_delivery/common/service/rosbag_upload.go

@@ -7,6 +7,8 @@ import (
 	"cicv-data-closedloop/common/domain"
 	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
+	commonUtil "cicv-data-closedloop/common/util"
+	"encoding/json"
 	"fmt"
 	"os"
 	"strings"
@@ -148,6 +150,8 @@ outLoop:
 			c_log.GlobalLogger.Error("上传 callback.json 文件失败:", err)
 		}
 
+		// 数据库中采集数量加一
+		collectNumPlus()
 		// 删除本地所有已上传的bag文件
 		c_log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
 		c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
@@ -157,6 +161,26 @@ outLoop:
 		if len(entity.TimeWindowConsumerQueue) == 0 {
 			c_log.GlobalLogger.Infof("已处理所有窗口,重启 record 命令。")
 			ChannelKillRosRecord <- 2
+			entity.ProcessingFlag = false
 		}
 	}
 }
+
+func collectNumPlus() {
+	responseString, err := commonUtil.HttpPostJsonWithHeaders(
+		commonConfig.CloudConfig.CollectNumPlus.Url,
+		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+		map[string]string{
+			"snCode": commonConfig.LocalConfig.SecretKey,
+		},
+	)
+	if err != nil {
+		c_log.GlobalLogger.Error("发送http请求修改采集数量失败:", err)
+	}
+	// 解析JSON字符串到Response结构体
+	var resp entity.Response
+	err = json.Unmarshal([]byte(responseString), &resp)
+	if err != nil {
+		c_log.GlobalLogger.Error("解析修改采集数量结果失败:", err)
+	}
+}

+ 1 - 0
aarch64/pjibot_delivery/master/package/service/move_bag_and_send_window.go

@@ -59,6 +59,7 @@ func RunTimeWindowProducerQueue() {
 				time.Sleep(time.Duration(2) * time.Second)
 				c_log.GlobalLogger.Info("采集数据,发送record命令进程关闭信号。")
 				commonService.ChannelKillRosRecord <- 3
+				entity.ProcessingFlag = true
 				domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
 				// 将时间窗口移出准备队列
 				entity.RemoveHeadOfTimeWindowProducerQueue()

+ 97 - 53
aarch64/pjibot_delivery/master/package/service/produce_window.go

@@ -43,9 +43,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *diagnostic_msgs.DiagnosticArray) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -55,6 +52,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfDiagnostics {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -76,9 +76,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *sensor_msgs.Imu) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -88,6 +85,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfImu {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -111,9 +111,6 @@ func PrepareTimeWindowProducerQueue() {
 					Callback: func(data *pjibot_delivery_msgs.LocateInfo) {
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
-							if !canCollect() {
-								return
-							}
 							subscribersMutexes[i].Lock()
 							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
 							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
@@ -121,6 +118,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfLocateInfo {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -142,9 +142,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *std_msgs.UInt8) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -154,6 +151,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfObstacleDetection {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -177,9 +177,6 @@ func PrepareTimeWindowProducerQueue() {
 					Callback: func(data *nav_msgs.Odometry) {
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
-							if !canCollect() {
-								return
-							}
 							subscribersMutexes[i].Lock()
 							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
 							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
@@ -187,6 +184,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfOdom {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -210,9 +210,6 @@ func PrepareTimeWindowProducerQueue() {
 					Callback: func(data *pjibot_delivery_msgs.SysInfo) {
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
-							if !canCollect() {
-								return
-							}
 							subscribersMutexes[i].Lock()
 							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
 							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
@@ -220,6 +217,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfSysInfo {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -241,9 +241,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *geometry_msgs.PoseStamped) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -253,6 +250,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfRobotPose {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -274,9 +274,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_delivery_msgs.TaskFeedbackInfo) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -286,6 +283,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfTaskFeedbackInfo {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -307,9 +307,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *nav_msgs.Odometry) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -319,6 +316,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfWheelOdom {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -414,34 +414,78 @@ func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []st
 
 // 判断采集包数量是否超过限额
 func canCollect() bool {
-	responseString, err := commonUtil.HttpPostJsonWithHeaders(
-		commonConfig.CloudConfig.CollectLimit.Url,
-		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
-		map[string]string{
-			"snCode":            commonConfig.LocalConfig.SecretKey,
-			"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
-			"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
-			"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
-			"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
-		},
-	)
-	if err != nil {
-		c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
-		return false
-	}
-	// 解析JSON字符串到Response结构体
-	var resp entity.Response
-	err = json.Unmarshal([]byte(responseString), &resp)
-	if err != nil {
-		c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
-		return false
-	}
-	if resp.Code != 200 { // 不是200 代表不允许采集
-		return false
+	// 如果开启了采集频率限制,则云端判断采集数量是否超过限额
+	if commonConfig.CloudConfig.CollectLimit.Flag == 1 {
+		c_log.GlobalLogger.Error("当前设备已开启数采频率限制,需判断采集数量是否达到限额。")
+		responseString, err := commonUtil.HttpPostJsonWithHeaders(
+			commonConfig.CloudConfig.CollectLimit.Url,
+			map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+			map[string]string{
+				"snCode":            commonConfig.LocalConfig.SecretKey,
+				"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+				"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+				"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+				"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+			},
+		)
+		if err != nil {
+			c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
+			return false
+		}
+		// 解析JSON字符串到Response结构体
+		var resp entity.Response
+		err = json.Unmarshal([]byte(responseString), &resp)
+		if err != nil {
+			c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
+			return false
+		}
+		if resp.Code != 200 { // 不是200 代表不允许采集
+			c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
+			return false
+		}
+	} else {
+		c_log.GlobalLogger.Error("当前设备未开启数采频率限制,无需判断采集数量是否达到限额。")
 	}
-	if len(entity.TimeWindowConsumerQueue) != 0 {
-		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+
+	// 本地判断是否存在正在处理的数据
+	if entity.ProcessingFlag {
+		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。")
 		return false
 	}
+
+	c_log.GlobalLogger.Info("允许采集。")
 	return true
 }
+
+//func canCollect() bool {
+//	responseString, err := commonUtil.HttpPostJsonWithHeaders(
+//		commonConfig.CloudConfig.CollectLimit.Url,
+//		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+//		map[string]string{
+//			"snCode":            commonConfig.LocalConfig.SecretKey,
+//			"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+//			"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+//			"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+//			"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+//		},
+//	)
+//	if err != nil {
+//		c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
+//		return false
+//	}
+//	// 解析JSON字符串到Response结构体
+//	var resp entity.Response
+//	err = json.Unmarshal([]byte(responseString), &resp)
+//	if err != nil {
+//		c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
+//		return false
+//	}
+//	if resp.Code != 200 { // 不是200 代表不允许采集
+//		return false
+//	}
+//	if len(entity.TimeWindowConsumerQueue) != 0 {
+//		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+//		return false
+//	}
+//	return true
+//}

+ 3 - 1
aarch64/pjibot_patrol/common/service/disk_clean.go

@@ -27,7 +27,9 @@ func DiskClean() {
 	for {
 		time.Sleep(1000 * time.Millisecond)
 		// 1 获取磁盘占用
-		diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name)
+		//diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name) // 获取整个磁盘空间
+
+		diskUsed, _ := util.GetDirectoryDiskUsed(commonConfig.CloudConfig.Disk.Path) // 获取指定目录空间
 		if diskUsed > commonConfig.CloudConfig.Disk.Used {
 			policy := commonConfig.PlatformConfig.TaskCachePolicy
 			c_log.GlobalLogger.Errorf("磁盘占用 %v 超过 %v,触发删除规则 %v", diskUsed, commonConfig.CloudConfig.Disk.Used, policyToDescription[policy])

+ 24 - 0
aarch64/pjibot_patrol/common/service/rosbag_upload.go

@@ -7,6 +7,8 @@ import (
 	"cicv-data-closedloop/common/domain"
 	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
+	commonUtil "cicv-data-closedloop/common/util"
+	"encoding/json"
 	"fmt"
 	"os"
 	"strings"
@@ -148,6 +150,8 @@ outLoop:
 			c_log.GlobalLogger.Error("上传 callback.json 文件失败:", err)
 		}
 
+		// 数据库中采集数量加一
+		collectNumPlus()
 		// 删除本地所有已上传的bag文件
 		c_log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
 		c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
@@ -157,7 +161,27 @@ outLoop:
 		if len(entity.TimeWindowConsumerQueue) == 0 {
 			c_log.GlobalLogger.Infof("已处理所有窗口,重启 record 命令。")
 			ChannelKillRosRecord <- 2
+			entity.ProcessingFlag = false
 		}
 
 	}
 }
+
+func collectNumPlus() {
+	responseString, err := commonUtil.HttpPostJsonWithHeaders(
+		commonConfig.CloudConfig.CollectNumPlus.Url,
+		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+		map[string]string{
+			"snCode": commonConfig.LocalConfig.SecretKey,
+		},
+	)
+	if err != nil {
+		c_log.GlobalLogger.Error("发送http请求修改采集数量失败:", err)
+	}
+	// 解析JSON字符串到Response结构体
+	var resp entity.Response
+	err = json.Unmarshal([]byte(responseString), &resp)
+	if err != nil {
+		c_log.GlobalLogger.Error("解析修改采集数量结果失败:", err)
+	}
+}

+ 1 - 0
aarch64/pjibot_patrol/master/package/service/move_bag_and_send_window.go

@@ -59,6 +59,7 @@ func RunTimeWindowProducerQueue() {
 				time.Sleep(time.Duration(2) * time.Second)
 				c_log.GlobalLogger.Info("采集数据,发送record命令进程关闭信号。")
 				commonService.ChannelKillRosRecord <- 3
+				entity.ProcessingFlag = true
 				domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
 				// 将时间窗口移出准备队列
 				entity.RemoveHeadOfTimeWindowProducerQueue()

+ 97 - 53
aarch64/pjibot_patrol/master/package/service/produce_window.go

@@ -43,9 +43,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *diagnostic_msgs.DiagnosticArray) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -55,6 +52,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfDiagnostics {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -76,9 +76,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *sensor_msgs.Imu) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -88,6 +85,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfImu {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -109,9 +109,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_patrol_msgs.LocateInfo) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -121,6 +118,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfLocateInfo {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -142,9 +142,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *std_msgs.UInt8) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -154,6 +151,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfObstacleDetection {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -175,9 +175,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *nav_msgs.Odometry) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -187,6 +184,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfOdom {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -208,9 +208,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_patrol_msgs.SysInfo) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -220,6 +217,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfSysInfo {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -241,9 +241,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *geometry_msgs.PoseStamped) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -253,6 +250,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfRobotPose {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -274,9 +274,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_patrol_msgs.TaskFeedbackInfo) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -286,6 +283,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfTaskFeedbackInfo {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -307,9 +307,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *nav_msgs.Odometry) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -319,6 +316,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfWheelOdom {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -414,34 +414,78 @@ func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []st
 
 // 判断采集包数量是否超过限额
 func canCollect() bool {
-	responseString, err := commonUtil.HttpPostJsonWithHeaders(
-		commonConfig.CloudConfig.CollectLimit.Url,
-		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
-		map[string]string{
-			"snCode":            commonConfig.LocalConfig.SecretKey,
-			"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
-			"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
-			"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
-			"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
-		},
-	)
-	if err != nil {
-		c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
-		return false
-	}
-	// 解析JSON字符串到Response结构体
-	var resp entity.Response
-	err = json.Unmarshal([]byte(responseString), &resp)
-	if err != nil {
-		c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
-		return false
-	}
-	if resp.Code != 200 { // 不是200 代表不允许采集
-		return false
+	// 如果开启了采集频率限制,则云端判断采集数量是否超过限额
+	if commonConfig.CloudConfig.CollectLimit.Flag == 1 {
+		c_log.GlobalLogger.Error("当前设备已开启数采频率限制,需判断采集数量是否达到限额。")
+		responseString, err := commonUtil.HttpPostJsonWithHeaders(
+			commonConfig.CloudConfig.CollectLimit.Url,
+			map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+			map[string]string{
+				"snCode":            commonConfig.LocalConfig.SecretKey,
+				"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+				"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+				"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+				"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+			},
+		)
+		if err != nil {
+			c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
+			return false
+		}
+		// 解析JSON字符串到Response结构体
+		var resp entity.Response
+		err = json.Unmarshal([]byte(responseString), &resp)
+		if err != nil {
+			c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
+			return false
+		}
+		if resp.Code != 200 { // 不是200 代表不允许采集
+			c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
+			return false
+		}
+	} else {
+		c_log.GlobalLogger.Error("当前设备未开启数采频率限制,无需判断采集数量是否达到限额。")
 	}
-	if len(entity.TimeWindowConsumerQueue) != 0 {
-		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+
+	// 本地判断是否存在正在处理的数据
+	if entity.ProcessingFlag {
+		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。")
 		return false
 	}
+
+	c_log.GlobalLogger.Info("允许采集。")
 	return true
 }
+
+//func canCollect() bool {
+//	responseString, err := commonUtil.HttpPostJsonWithHeaders(
+//		commonConfig.CloudConfig.CollectLimit.Url,
+//		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+//		map[string]string{
+//			"snCode":            commonConfig.LocalConfig.SecretKey,
+//			"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+//			"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+//			"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+//			"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+//		},
+//	)
+//	if err != nil {
+//		c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
+//		return false
+//	}
+//	// 解析JSON字符串到Response结构体
+//	var resp entity.Response
+//	err = json.Unmarshal([]byte(responseString), &resp)
+//	if err != nil {
+//		c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
+//		return false
+//	}
+//	if resp.Code != 200 { // 不是200 代表不允许采集
+//		return false
+//	}
+//	if len(entity.TimeWindowConsumerQueue) != 0 {
+//		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+//		return false
+//	}
+//	return true
+//}