瀏覽代碼

Merge branch 'master' of https://gitee.com/lingxinmeng/cicv-data-closedloop

LingxinMeng 6 月之前
父節點
當前提交
c712825772

+ 33 - 0
.idea/dataSources.xml

@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="DataSourceManagerImpl" format="xml" multifile-model="true">
+    <data-source source="LOCAL" name="36.110.106.156" uuid="d3edc464-7775-4251-9430-305a6c8c39c3">
+      <driver-ref>mysql.8</driver-ref>
+      <synchronize>true</synchronize>
+      <jdbc-driver>com.mysql.cj.jdbc.Driver</jdbc-driver>
+      <jdbc-url>jdbc:mysql://36.110.106.156:3306</jdbc-url>
+      <working-dir>$ProjectFileDir$</working-dir>
+    </data-source>
+    <data-source source="LOCAL" name="@localhost" uuid="656b84ef-baa2-423d-95fe-000a2972e769">
+      <driver-ref>redis</driver-ref>
+      <synchronize>true</synchronize>
+      <jdbc-driver>jdbc.RedisDriver</jdbc-driver>
+      <jdbc-url>jdbc:redis://36.110.106.156:6379/0</jdbc-url>
+      <working-dir>$ProjectFileDir$</working-dir>
+    </data-source>
+    <data-source source="LOCAL" name="dataclosedloop@36.110.106.156" uuid="cf5c8d28-0670-425d-af03-a6bf1b888cec">
+      <driver-ref>mysql.8</driver-ref>
+      <synchronize>true</synchronize>
+      <jdbc-driver>com.mysql.cj.jdbc.Driver</jdbc-driver>
+      <jdbc-url>jdbc:mysql://36.110.106.156:3306/dataclosedloop</jdbc-url>
+      <working-dir>$ProjectFileDir$</working-dir>
+    </data-source>
+    <data-source source="LOCAL" name="pji_desktop@10.14.85.240" uuid="a6377c8d-eff0-4172-836c-f79e4bd0b4e7">
+      <driver-ref>mysql.8</driver-ref>
+      <synchronize>true</synchronize>
+      <jdbc-driver>com.mysql.cj.jdbc.Driver</jdbc-driver>
+      <jdbc-url>jdbc:mysql://10.14.85.240:3306/pji_desktop</jdbc-url>
+      <working-dir>$ProjectFileDir$</working-dir>
+    </data-source>
+  </component>
+</project>

+ 58 - 90
aarch64/pjibot_delivery/master/package/service/produce_window.go

@@ -43,6 +43,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *diagnostic_msgs.DiagnosticArray) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -53,16 +56,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -80,6 +76,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *sensor_msgs.Imu) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -90,16 +89,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -119,6 +111,9 @@ func PrepareTimeWindowProducerQueue() {
 					Callback: func(data *pjibot_delivery_msgs.LocateInfo) {
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
+							if !canCollect() {
+								return
+							}
 							subscribersMutexes[i].Lock()
 							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
 							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
@@ -127,16 +122,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -154,6 +142,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *std_msgs.UInt8) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -164,16 +155,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -193,6 +177,9 @@ func PrepareTimeWindowProducerQueue() {
 					Callback: func(data *nav_msgs.Odometry) {
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
+							if !canCollect() {
+								return
+							}
 							subscribersMutexes[i].Lock()
 							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
 							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
@@ -201,16 +188,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -230,6 +210,9 @@ func PrepareTimeWindowProducerQueue() {
 					Callback: func(data *pjibot_delivery_msgs.SysInfo) {
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
+							if !canCollect() {
+								return
+							}
 							subscribersMutexes[i].Lock()
 							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
 							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
@@ -238,16 +221,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -265,6 +241,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *geometry_msgs.PoseStamped) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -275,16 +254,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -302,6 +274,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_delivery_msgs.TaskFeedbackInfo) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -312,16 +287,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -339,6 +307,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *nav_msgs.Odometry) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -349,16 +320,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -475,5 +439,9 @@ func canCollect() bool {
 	if resp.Code != 200 { // 不是200 代表不允许采集
 		return false
 	}
+	if len(entity.TimeWindowConsumerQueue) != 0 {
+		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+		return false
+	}
 	return true
 }

+ 2 - 0
aarch64/pjibot_guide/common/config/c_cloud.go

@@ -49,6 +49,7 @@ type TriggerStruct struct {
 
 type CollectLimitStruct struct {
 	Url   string `yaml:"url"`
+	Flag  int    `yaml:"flag"`
 	Day   int    `yaml:"day"`
 	Week  int    `yaml:"week"`
 	Month int    `yaml:"month"`
@@ -63,6 +64,7 @@ type DataDirStruct struct {
 
 type cloudConfig struct {
 	CollectLimit          CollectLimitStruct `yaml:"collect-limit"`
+	CollectNumPlus        CollectLimitStruct `yaml:"collect-num-plus"`
 	DataDir               DataDirStruct      `yaml:"data-dir"`
 	MapBufFiles           []string           `yaml:"map-buf-files"`
 	FullCollect           bool               `yaml:"full-collect"`

+ 1 - 1
aarch64/pjibot_guide/common/config/c_websocket.go

@@ -97,7 +97,7 @@ func InitWebsocketConfig() {
 		c_log.GlobalLogger.Info("初始化Websocket连接 - 成功。")
 
 		// 保持连接活跃
-		//go keepAlive()
+		go keepAlive()
 
 		// 连接成功,退出循环
 		reconnectionInProgress = false

+ 24 - 0
aarch64/pjibot_guide/common/service/rosbag_upload.go

@@ -7,6 +7,8 @@ import (
 	"cicv-data-closedloop/common/domain"
 	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
+	commonUtil "cicv-data-closedloop/common/util"
+	"encoding/json"
 	"fmt"
 	"os"
 	"path/filepath"
@@ -212,6 +214,8 @@ outLoop:
 			//commonConfig.OssMutex.Unlock()
 		}
 
+		// 数据库中采集数量加一
+		collectNumPlus()
 		// 删除本地所有已上传的bag文件
 		c_log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
 		c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
@@ -221,6 +225,26 @@ outLoop:
 		if len(entity.TimeWindowConsumerQueue) == 0 {
 			c_log.GlobalLogger.Infof("已处理所有窗口,重启 record 命令。")
 			ChannelKillRosRecord <- 2
+			entity.ProcessingFlag = false
 		}
 	}
 }
+
+func collectNumPlus() {
+	responseString, err := commonUtil.HttpPostJsonWithHeaders(
+		commonConfig.CloudConfig.CollectNumPlus.Url,
+		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+		map[string]string{
+			"snCode": commonConfig.LocalConfig.SecretKey,
+		},
+	)
+	if err != nil {
+		c_log.GlobalLogger.Error("发送http请求修改采集数量失败:", err)
+	}
+	// 解析JSON字符串到Response结构体
+	var resp entity.Response
+	err = json.Unmarshal([]byte(responseString), &resp)
+	if err != nil {
+		c_log.GlobalLogger.Error("解析修改采集数量结果失败:", err)
+	}
+}

+ 1 - 0
aarch64/pjibot_guide/master/package/service/move_bag_and_send_window.go

@@ -60,6 +60,7 @@ func RunTimeWindowProducerQueue() {
 				time.Sleep(time.Duration(2) * time.Second)
 				c_log.GlobalLogger.Info("采集数据,发送record命令进程关闭信号。")
 				commonService.ChannelKillRosRecord <- 3
+				entity.ProcessingFlag = true
 				domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
 				// 将时间窗口移出准备队列
 				entity.RemoveHeadOfTimeWindowProducerQueue()

+ 39 - 27
aarch64/pjibot_guide/master/package/service/produce_window.go

@@ -37,9 +37,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *std_msgs.UInt8) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -49,6 +46,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfObstacleDetection {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										return
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -141,32 +141,44 @@ func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []st
 	return faultCodeTopics, nil
 }
 
-// 判断采集包数量是否超过限额
+// 判断是否可采集数据
 func canCollect() bool {
-	responseString, err := commonUtil.HttpPostJsonWithHeaders(
-		commonConfig.CloudConfig.CollectLimit.Url,
-		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
-		map[string]string{
-			"snCode":            commonConfig.LocalConfig.SecretKey,
-			"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
-			"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
-			"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
-			"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
-		},
-	)
-	if err != nil {
-		c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
-		return false
-	}
-	// 解析JSON字符串到Response结构体
-	var resp entity.Response
-	err = json.Unmarshal([]byte(responseString), &resp)
-	if err != nil {
-		c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
-		return false
+	// 如果开启了采集频率限制,则云端判断采集数量是否超过限额
+	if commonConfig.CloudConfig.CollectLimit.Flag == 1 {
+		c_log.GlobalLogger.Error("当前设备已开启数采频率限制,需判断采集数量是否达到限额。")
+		responseString, err := commonUtil.HttpPostJsonWithHeaders(
+			commonConfig.CloudConfig.CollectLimit.Url,
+			map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+			map[string]string{
+				"snCode":            commonConfig.LocalConfig.SecretKey,
+				"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+				"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+				"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+				"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+			},
+		)
+		if err != nil {
+			c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
+			return false
+		}
+		// 解析JSON字符串到Response结构体
+		var resp entity.Response
+		err = json.Unmarshal([]byte(responseString), &resp)
+		if err != nil {
+			c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
+			return false
+		}
+		if resp.Code != 200 { // 不是200 代表不允许采集
+			c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
+			return false
+		}
+	} else {
+		c_log.GlobalLogger.Error("当前设备未开启数采频率限制,无需判断采集数量是否达到限额。")
 	}
-	if resp.Code != 200 { // 不是200 代表不允许采集
-		c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
+
+	// 本地判断是否存在正在处理的数据
+	if entity.ProcessingFlag {
+		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。")
 		return false
 	}
 

+ 3 - 0
aarch64/pjibot_guide/引导机器人默认配置文件-cloud-config.yaml

@@ -1,10 +1,13 @@
 ---
 collect-limit:
   url: http://36.110.106.142:12341/web_server/collect_limit/can_collect
+  flag: 1 # 数采频率限制标志 0 - 关闭数采频率限制  1 - 开启数采频率限制
   day: 1
   week: 3
   month: 12
   year: 144
+collect-num-plus:
+  url: http://36.110.106.142:12341/web_server/collect_limit/plus_collect_num
 monitor:
   url: http://36.110.106.142:12341/web_server/monitor/insert
 platform:

+ 58 - 90
aarch64/pjibot_patrol/master/package/service/produce_window.go

@@ -43,6 +43,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *diagnostic_msgs.DiagnosticArray) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -53,16 +56,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -80,6 +76,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *sensor_msgs.Imu) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -90,16 +89,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -117,6 +109,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_patrol_msgs.LocateInfo) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -127,16 +122,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -154,6 +142,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *std_msgs.UInt8) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -164,16 +155,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -191,6 +175,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *nav_msgs.Odometry) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -201,16 +188,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -228,6 +208,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_patrol_msgs.SysInfo) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -238,16 +221,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -265,6 +241,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *geometry_msgs.PoseStamped) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -275,16 +254,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -302,6 +274,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_patrol_msgs.TaskFeedbackInfo) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -312,16 +287,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -339,6 +307,9 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *nav_msgs.Odometry) {
+						if !canCollect() {
+							return
+						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -349,16 +320,9 @@ func PrepareTimeWindowProducerQueue() {
 								faultLabel = f(data)
 								if faultLabel != "" {
 									subscribersTimes[i] = time.Now()
-									if canCollect() {
-										c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
-										saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-										break
-									} else {
-										if time.Since(logTime).Seconds() > logInterval {
-											logTime = time.Now()
-											c_log.GlobalLogger.Errorf("触发事件【%v】,但当前周期内采集数量已超限额,不再采集。", faultLabel)
-										}
-									}
+									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
+									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+									break
 								}
 							}
 							subscribersMutexes[i].Unlock()
@@ -475,5 +439,9 @@ func canCollect() bool {
 	if resp.Code != 200 { // 不是200 代表不允许采集
 		return false
 	}
+	if len(entity.TimeWindowConsumerQueue) != 0 {
+		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+		return false
+	}
 	return true
 }

+ 14 - 3
amd64/web_server/src/infrastructure/persistence/mapper_collect_limit.go

@@ -28,7 +28,18 @@ func InsertNew(snCode string) {
 	}
 }
 
-func UpdateCollectLimit(snCode string) error {
+func UpdateCollectLimit(snCode string) bool {
+	// 查询记录
+	var resultPos []collect.LimitPo
+	selectSql, _ := util.ReadFile(c_db.SqlFilesMap["collect_limit-select-by-sn_code.sql"])
+	err := c_db.MysqlDb.Select(&resultPos, selectSql, snCode)
+	if err != nil {
+		c_log.GlobalLogger.Error("查询数据报错:", err)
+		return false
+	}
+	if len(resultPos) == 0 {
+		return false
+	}
 	// 更新记录结束时间为默认时间
 	sqlTemplate, _ := util.ReadFile(c_db.SqlFilesMap["collect_limit-add_one.sql"])
 	c_log.GlobalLogger.Error("执行 sql:", sqlTemplate)
@@ -36,9 +47,9 @@ func UpdateCollectLimit(snCode string) error {
 		snCode,
 	}); err != nil {
 		c_log.GlobalLogger.Error("插入数据报错:", err)
-		return err
+		return false
 	}
-	return nil
+	return true
 }
 
 //

+ 2 - 2
amd64/web_server/src/interfaces/api/h_collect_limit.go

@@ -78,8 +78,8 @@ func CanCollectPlus(c *gin.Context) {
 	}
 	c_log.GlobalLogger.Info("请求体为:", param)
 	// 所有值添加一
-	err := persistence.UpdateCollectLimit(param.SnCode)
-	if err != nil {
+	flag := persistence.UpdateCollectLimit(param.SnCode)
+	if !flag {
 		c.JSON(http.StatusOK, commonEntity.Response{
 			Code: 400,
 			Msg:  "设备采集数量更新失败。",

+ 1 - 0
common/entity/time_window.go

@@ -17,6 +17,7 @@ var (
 
 	TcpSendTime      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
 	TcpSendTimeMutex sync.Mutex
+	ProcessingFlag   = false // 是否有数据正在被处理
 )
 
 type TimeWindow struct {