Przeglądaj źródła

feat: 启动任务时检查是否达到采集限额

HeWang 5 miesięcy temu
rodzic
commit
9fb3a5d138

+ 70 - 19
aarch64/pjibot_delivery/control/main.go

@@ -8,6 +8,8 @@ import (
 	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
+	commonUtil "cicv-data-closedloop/common/util"
+	"encoding/json"
 	"fmt"
 	"net/rpc"
 	"os"
@@ -16,20 +18,23 @@ import (
 )
 
 var (
-	applicationName = "pji-control"
-	localStatus     = "idle"
-	cloudStatus     = "NONE"
-	lastLocalStatus = "idle"
-	lastCloudStatus = "NONE"
-	localTurnLength = 1  // s,本地状态刷新时间
-	cloudTurnLength = 60 // s,云端状态刷新时间
-	renewTurnLength = 3  // s,续约状态刷新时间
-	waitStopLength  = 1  // min,停止master前等待时间
-	launchedFlag    = false
-	renewedFlag     = false
-	renewTimer      *time.Timer // 续约定时器
-	RenewDur        = 5         // min, 续约时间
-	maxRetryCount   = 10        // 查询配置最大重试次数
+	applicationName  = "pji-control"
+	localStatus      = "idle"
+	cloudStatus      = "NONE"
+	lastLocalStatus  = "idle"
+	lastCloudStatus  = "NONE"
+	timeAllowedFlag  = false
+	limitReachedFlag = false
+	localTurnLength  = 1  // s,本地状态刷新时间
+	cloudTurnLength  = 60 // s,云端状态刷新时间
+	renewTurnLength  = 3  // s,续约状态刷新时间
+	waitStopLength   = 1  // min,停止master前等待时间
+	checkLimitLength = 1  // min, 查询是否达到采集限制等待时间
+	launchedFlag     = false
+	renewedFlag      = false
+	renewTimer       *time.Timer // 续约定时器
+	RenewDur         = 5         // min, 续约时间
+	maxRetryCount    = 10        // 查询配置最大重试次数
 )
 
 func init() {
@@ -46,7 +51,7 @@ func init() {
 	commonConfig.InitWebsocketConfig()
 }
 
-func IsTimeAllowed(currentTime time.Time) bool {
+func isTimeAllowed(currentTime time.Time) bool {
 	cw := commonConfig.CloudConfig.CollectWindow
 	if cw.Flag == 0 { // 关闭固定时间段采集, 则都返回true
 		return true
@@ -74,6 +79,43 @@ func IsTimeAllowed(currentTime time.Time) bool {
 	return !currentTime.Before(start) && currentTime.Before(end)
 }
 
+func checkCollectLimit() {
+	for {
+		time.Sleep(time.Duration(checkLimitLength) * time.Minute)
+		// 如果开启了采集频率限制,则云端判断采集数量是否超过限额
+		if commonConfig.CloudConfig.CollectLimit.Flag == 1 {
+			//c_log.GlobalLogger.Error("当前设备已开启数采频率限制,需判断采集数量是否达到限额。")
+			responseString, err := commonUtil.HttpPostJsonWithHeaders(
+				commonConfig.CloudConfig.CollectLimit.Url,
+				map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+				map[string]string{
+					"snCode":            commonConfig.LocalConfig.SecretKey,
+					"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+					"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+					"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+					"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+				},
+			)
+			if err != nil {
+				c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
+				continue
+			}
+			// 解析JSON字符串到Response结构体
+			var resp entity.Response
+			err = json.Unmarshal([]byte(responseString), &resp)
+			if err != nil {
+				c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
+				continue
+			}
+			if resp.Code != 200 { // 不是200 代表采集数量已超过限额不允许采集
+				//c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
+				limitReachedFlag = true
+			}
+		}
+		limitReachedFlag = false
+	}
+}
+
 func initRenew() {
 	c_log.GlobalLogger.Info("启动定时器 - 开始。")
 	if renewTimer != nil {
@@ -162,6 +204,9 @@ func main() {
 	// 定期检查本地任务状态,执行续约,避免短时间内多次启停
 	go renew()
 
+	// 检查是否达到限额
+	go checkCollectLimit()
+
 	// 云端任务状态负责更新配置
 	go pkg.GetCloudConfig(&cloudStatus, &lastCloudStatus, cloudTurnLength)
 
@@ -175,6 +220,12 @@ func main() {
 		fmt.Println("localStatus: ", localStatus, "lastLocalStatus: ", lastLocalStatus)
 		fmt.Println("cloudStatus: ", cloudStatus, "lastCloudStatus: ", lastCloudStatus)
 
+		// 采集时间限制
+		currentTime := time.Now()
+		timeAllowedFlag = isTimeAllowed(currentTime)
+
+		// 采集频率限制
+
 		// 综合判断 cloudStatus 和 localStatus
 		// cloudStatus
 		// UN_CHANGE 没有新的任务,无需更改
@@ -185,16 +236,16 @@ func main() {
 		// running 繁忙状态,此状态需要启动数采任务
 		// error 错误状态,此状态下不启动数采任务
 
-		// 本地任务状态负责启停master
-		if localStatus == "running" {
+		// 启停master
+		if localStatus == "running" && timeAllowedFlag && !limitReachedFlag {
 			if !launchedFlag { // 目前未启动数采程序
 				c_log.GlobalLogger.Info("数采程序启动 - 开始。")
 				startMasterNode()
 			}
-		} else if localStatus == "idle" {
+		} else if localStatus == "idle" || !timeAllowedFlag || !limitReachedFlag {
 			if !renewedFlag && launchedFlag && len(entity.TimeWindowConsumerQueue) == 0 {
 				time.Sleep(time.Duration(waitStopLength) * time.Minute)
-				c_log.GlobalLogger.Info("设备不在运行状态且没有待处理的数据,数采程序关闭 - 开始。")
+				c_log.GlobalLogger.Info("数采程序关闭 - 开始。")
 				stopMasterNode()
 			}
 		} else if localStatus == "error" {

+ 60 - 16
aarch64/pjibot_guide/control/main.go

@@ -8,6 +8,8 @@ import (
 	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
+	commonUtil "cicv-data-closedloop/common/util"
+	"encoding/json"
 	"fmt"
 	"net/rpc"
 	"os"
@@ -16,20 +18,22 @@ import (
 )
 
 var (
-	applicationName = "pji-control"
-	localStatus     = "idle"
-	cloudStatus     = "NONE"
-	lastLocalStatus = "idle"
-	lastCloudStatus = "NONE"
-	localTurnLength = 1  // s,本地状态刷新时间
-	cloudTurnLength = 60 // s,云端状态刷新时间
-	renewTurnLength = 3  // s,续约状态刷新时间
-	waitStopLength  = 1  // min,停止master前等待时间
-	launchedFlag    = false
-	renewedFlag     = false
-	renewTimer      *time.Timer // 续约定时器
-	RenewDur        = 5         // min, 续约时间
-	maxRetryCount   = 10        // 查询配置最大重试次数
+	applicationName  = "pji-control"
+	localStatus      = "idle"
+	cloudStatus      = "NONE"
+	lastLocalStatus  = "idle"
+	lastCloudStatus  = "NONE"
+	limitReachedFlag = false
+	localTurnLength  = 1  // s,本地状态刷新时间
+	cloudTurnLength  = 60 // s,云端状态刷新时间
+	renewTurnLength  = 3  // s,续约状态刷新时间
+	waitStopLength   = 1  // min,停止master前等待时间
+	checkLimitLength = 1  // min, 查询是否达到采集限制等待时间
+	launchedFlag     = false
+	renewedFlag      = false
+	renewTimer       *time.Timer // 续约定时器
+	RenewDur         = 5         // min, 续约时间
+	maxRetryCount    = 10        // 查询配置最大重试次数
 )
 
 func init() {
@@ -48,6 +52,43 @@ func init() {
 	commonConfig.InitWebsocketConfig()
 }
 
+func checkCollectLimit() {
+	for {
+		time.Sleep(time.Duration(checkLimitLength) * time.Minute)
+		// 如果开启了采集频率限制,则云端判断采集数量是否超过限额
+		if commonConfig.CloudConfig.CollectLimit.Flag == 1 {
+			//c_log.GlobalLogger.Error("当前设备已开启数采频率限制,需判断采集数量是否达到限额。")
+			responseString, err := commonUtil.HttpPostJsonWithHeaders(
+				commonConfig.CloudConfig.CollectLimit.Url,
+				map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+				map[string]string{
+					"snCode":            commonConfig.LocalConfig.SecretKey,
+					"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+					"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+					"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+					"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+				},
+			)
+			if err != nil {
+				c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
+				continue
+			}
+			// 解析JSON字符串到Response结构体
+			var resp entity.Response
+			err = json.Unmarshal([]byte(responseString), &resp)
+			if err != nil {
+				c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
+				continue
+			}
+			if resp.Code != 200 { // 不是200 代表采集数量已超过限额不允许采集
+				//c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
+				limitReachedFlag = true
+			}
+		}
+		limitReachedFlag = false
+	}
+}
+
 func initRenew() {
 	c_log.GlobalLogger.Info("启动定时器 - 开始。")
 	if renewTimer != nil {
@@ -137,6 +178,9 @@ func main() {
 	// 定期检查本地任务状态,执行续约,避免短时间内多次启停
 	go renew()
 
+	// 检查是否达到限额
+	go checkCollectLimit()
+
 	// 云端任务状态负责更新配置
 	go pkg.GetCloudConfig(&cloudStatus, &lastCloudStatus, cloudTurnLength)
 
@@ -161,12 +205,12 @@ func main() {
 		// error 错误状态,此状态下不启动数采任务
 
 		// 本地任务状态负责启停master
-		if localStatus == "running" {
+		if localStatus == "running" && !limitReachedFlag {
 			if !launchedFlag { // 目前未启动数采程序
 				c_log.GlobalLogger.Info("数采程序启动 - 开始。")
 				startMasterNode()
 			}
-		} else if localStatus == "idle" {
+		} else if localStatus == "idle" || limitReachedFlag {
 			if !renewedFlag && launchedFlag && len(entity.TimeWindowConsumerQueue) == 0 {
 				time.Sleep(time.Duration(waitStopLength) * time.Minute)
 				c_log.GlobalLogger.Info("设备不在运行状态且没有待处理的数据,数采程序关闭 - 开始。")