Pārlūkot izejas kodu

feat: 采集时间限制

HeWang 5 mēneši atpakaļ
vecāks
revīzija
493e723c75

+ 27 - 0
aarch64/pjibot_delivery/common/config/c_cloud.go

@@ -65,8 +65,18 @@ type CollectLimitStruct struct {
 	Year  int    `yaml:"year"`
 }
 
+type CollectWindowStruct struct {
+	Flag      int      `yaml:"flag"`
+	Days      []string `yaml:"days,omitempty"`
+	Start     string   `yaml:"start_time"`
+	End       string   `yaml:"end_time"`
+	StartTime time.Time
+	EndTime   time.Time
+}
+
 type cloudConfig struct {
 	CollectLimit          CollectLimitStruct   `yaml:"collect-limit"`
+	CollectWindow         CollectWindowStruct  `yaml:"collect-window"`
 	CollectNumPlus        CollectNumPlusStruct `yaml:"collect-num-plus"`
 	HasOneMsgTopic        bool                 `yaml:"has-one-msg-topic"` // 是否存在只发单帧的话题
 	FullCollect           bool                 `yaml:"full-collect"`
@@ -170,6 +180,23 @@ func InitCloudConfig() {
 		c_log.GlobalLogger.Error("程序崩溃,配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
 		os.Exit(-1)
 	}
+	// 单独解析采集时间
+	startTime, err := time.Parse("11:11", newCloudConfig.CollectWindow.Start)
+	if err != nil {
+		c_log.GlobalLogger.Error("云端配置文件解析采集时间【startTime】失败 ", err, "取默认值【00:00】")
+		newCloudConfig.CollectWindow.StartTime, _ = time.Parse("11:11", "00:00")
+	}
+	newCloudConfig.CollectWindow.StartTime = startTime
+	endTime, err := time.Parse("11:11", newCloudConfig.CollectWindow.End)
+	if err != nil {
+		c_log.GlobalLogger.Error("云端配置文件解析采集时间【endTime】失败 ", err, "取默认值【23:59】")
+		newCloudConfig.CollectWindow.EndTime, _ = time.Parse("11:11", "23:59")
+	}
+	newCloudConfig.CollectWindow.EndTime = endTime
+	if len(newCloudConfig.CollectWindow.Days) == 0 {
+		// 默认设置为每天
+		newCloudConfig.CollectWindow.Days = []string{"Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"}
+	}
 
 	// 5 ------- 校验 yaml -------
 	if checkCloudConfig(newCloudConfig) {

+ 17 - 0
aarch64/pjibot_delivery/common/config/c_platform.go

@@ -193,3 +193,20 @@ func checkPlatformConfig() bool {
 	}
 	return true
 }
+
+func CheckPlatformConfigStatus(maxRetryCount int) bool {
+	var err error
+	for i := 0; i < maxRetryCount; i++ {
+		time.Sleep(time.Duration(2) * time.Second)
+		// 判断是否有配置,第一次访问状态应该为:CHANGE(一共三种状态 CHANGE|UNCHANGE|NONE)
+		PlatformConfig, err = getConfig()
+		if err != nil {
+			c_log.GlobalLogger.Error("获取配置status失败:", err)
+			continue
+		}
+		if checkPlatformConfig() {
+			return true
+		}
+	}
+	return false
+}

+ 166 - 71
aarch64/pjibot_delivery/control/main.go

@@ -4,15 +4,33 @@ import (
 	commonConfig "cicv-data-closedloop/aarch64/pjibot_delivery/common/config"
 	commonService "cicv-data-closedloop/aarch64/pjibot_delivery/common/service"
 	"cicv-data-closedloop/aarch64/pjibot_delivery/common/variable"
+	"cicv-data-closedloop/aarch64/pjibot_delivery/control/pkg"
 	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
+	"fmt"
 	"net/rpc"
 	"os"
 	"runtime"
 	"time"
 )
 
-var applicationName = "pji-control"
+var (
+	applicationName = "pji-control"
+	localStatus     = "idle"
+	cloudStatus     = "NONE"
+	lastLocalStatus = "idle"
+	lastCloudStatus = "NONE"
+	localTurnLength = 1  // s,本地状态刷新时间
+	cloudTurnLength = 60 // s,云端状态刷新时间
+	renewTurnLength = 3  // s,续约状态刷新时间
+	waitStopLength  = 1  // min,停止master前等待时间
+	launchedFlag    = false
+	renewedFlag     = false
+	renewTimer      *time.Timer // 续约定时器
+	RenewDur        = 5         // min, 续约时间
+	maxRetryCount   = 10        // 查询配置最大重试次数
+)
 
 func init() {
 	runtime.GOMAXPROCS(1)
@@ -28,85 +46,162 @@ func init() {
 	commonConfig.InitWebsocketConfig()
 }
 
+func IsTimeAllowed(currentTime time.Time) bool {
+	cw := commonConfig.CloudConfig.CollectWindow
+	if cw.Flag == 0 { // 关闭固定时间段采集, 则都返回true
+		return true
+	}
+	if len(cw.Days) > 0 { // 如果指定了周几
+		currentDay := currentTime.Weekday().String()
+		included := false
+		for _, day := range cw.Days {
+			if day == currentDay {
+				included = true
+				break
+			}
+		}
+		if !included {
+			return false
+		}
+	}
+
+	start := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), cw.StartTime.Hour(), cw.StartTime.Minute(), cw.StartTime.Second(), 0, currentTime.Location())
+	end := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), cw.EndTime.Hour(), cw.EndTime.Minute(), cw.EndTime.Second(), 0, currentTime.Location())
+
+	if start.After(end) { // 如果时段跨天
+		end = end.AddDate(0, 0, 1)
+	}
+	return !currentTime.Before(start) && currentTime.Before(end)
+}
+
+func initRenew() {
+	c_log.GlobalLogger.Info("启动定时器 - 开始。")
+	if renewTimer != nil {
+		renewTimer.Stop()
+	}
+	renewedFlag = true
+	renewTimer = time.AfterFunc(time.Duration(RenewDur)*time.Minute, func() {
+		renewedFlag = false
+	})
+	c_log.GlobalLogger.Infof("定时时间【%v】分钟 - 成功。", RenewDur)
+}
+
+func renew() {
+	for {
+		time.Sleep(time.Duration(renewTurnLength) * time.Second)
+		if localStatus == "running" && launchedFlag && !renewedFlag { // 设备处于运行状态,数采程序已启动,且尚未续约
+			c_log.GlobalLogger.Info("设备仍处于运行状态,续约 - 开始。")
+			if renewTimer != nil {
+				renewTimer.Stop()
+			}
+			renewedFlag = true
+			renewTimer = time.AfterFunc(time.Duration(RenewDur)*time.Minute, func() {
+				renewedFlag = false
+			})
+			c_log.GlobalLogger.Infof("续约时间【%v】分钟 - 成功。", RenewDur)
+		}
+	}
+}
+
+func startMasterNode() {
+	c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
+
+	if commonConfig.CheckPlatformConfigStatus(maxRetryCount) {
+		c_log.GlobalLogger.Info("查询到数据闭环平台有配置任务。")
+		commonConfig.InitPlatformConfig()
+
+		if _, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...); err != nil {
+			c_log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
+			os.Exit(-1)
+		}
+		c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args)
+
+		initRenew()
+		launchedFlag = true
+
+		c_log.GlobalLogger.Info("数采程序启动 - 成功。")
+	} else {
+		c_log.GlobalLogger.Error("查询到数据闭环平台没有配置任务,不启动数采程序。")
+	}
+}
+
+func stopMasterNode() {
+	// 发送rpc信号杀死采集程序
+	var killArgs commonService.KillSignal
+	killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: false}
+	c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号:", killArgs)
+	KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
+	if err != nil {
+		// 此处如果连接失败说明采集程序已经停止了
+		lastCloudStatus = "NONE"
+		c_log.GlobalLogger.Error("采集程序已经停止:", err)
+		return
+	}
+
+	reply := 0
+	if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
+		c_log.GlobalLogger.Error("发送 rpc 请求到 master 报错:", err)
+	}
+
+	c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
+	commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
+	if err = KillRpcClient.Close(); err != nil {
+		// 不做处理
+	}
+
+	launchedFlag = false
+	c_log.GlobalLogger.Info("数采程序关闭 - 成功。")
+}
+
 func main() {
-	init := true
-	turnLength := 60
-	lastStatus := "NONE"
-	//  轮询任务接口判断是否有更新
+	// 更新本地任务状态
+	go pkg.GetLocalStatus(&localStatus, &lastLocalStatus, localTurnLength)
+	// 更新云端任务状态
+	go pkg.GetCloudStatus(&cloudStatus, &lastCloudStatus, cloudTurnLength)
+
+	// 定期检查本地任务状态,执行续约,避免短时间内多次启停
+	go renew()
+
+	// 云端任务状态负责更新配置
+	go pkg.GetCloudConfig(&cloudStatus, &lastCloudStatus, cloudTurnLength)
+
 	for {
-		c_log.GlobalLogger.Errorf("一轮次扫描时间【%v】秒:", turnLength)
-		if init {
-			time.Sleep(time.Duration(1) * time.Second)
-			init = false
+		if launchedFlag { // 当前已启动master节点
+			time.Sleep(time.Duration(cloudTurnLength) * time.Second)
 		} else {
-			time.Sleep(time.Duration(turnLength) * time.Second)
-		}
-		// 1 获取当前设备的任务的 status
-		status, err := commonConfig.GetStatus(commonConfig.PlatformConfig.TaskConfigId)
-		if err != nil {
-			c_log.GlobalLogger.Error("获取配置status失败:", err)
-			continue
+			time.Sleep(time.Duration(localTurnLength) * time.Second)
 		}
-		// 2 判断 status
-		// UN_CHANGE 没有新的任务,无需更改
-		// CHANGE 有新的任务,需要杀死旧的任务并重启
-		// NONE 设备没有配置任务,需要杀死旧的任务
-		if status == "UN_CHANGE" {
-			lastStatus = "UN_CHANGE"
-			continue
-		} else if status == "CHANGE" || status == "NONE" {
-			if lastStatus == "CHANGE" && status == "CHANGE" { // 供更新使用
-				commonConfig.InitPlatformConfig()
-				continue
-			}
-			if lastStatus == "NONE" && status == "NONE" {
-				continue
-			}
-			// 3 发送rpc信号杀死采集程序
-			if lastStatus == "NONE" && status == "CHANGE" {
-				if _, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...); err != nil {
-					c_log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
-					os.Exit(-1)
-				}
-				c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args)
-				lastStatus = status
-				c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
-				commonConfig.InitPlatformConfig()
-				continue
-			}
-			var killArgs commonService.KillSignal
-			if lastStatus == "UN_CHANGE" && status == "CHANGE" {
-				killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: true}
-				c_log.GlobalLogger.Info("更新任务,发送rpc重启信号:", killArgs)
-			}
-			if lastStatus == "UN_CHANGE" && status == "NONE" {
-				killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: false}
-				c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号:", killArgs)
-			}
 
-			KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
-			if err != nil {
-				// 此处如果连接失败说明采集程序已经停止了
-				lastStatus = "NONE"
-				c_log.GlobalLogger.Error("采集程序已经停止:", err)
-				continue
-			}
+		fmt.Println("localStatus: ", localStatus, "lastLocalStatus: ", lastLocalStatus)
+		fmt.Println("cloudStatus: ", cloudStatus, "lastCloudStatus: ", lastCloudStatus)
+
+		// 综合判断 cloudStatus 和 localStatus
+		// cloudStatus
+		// UN_CHANGE 没有新的任务,无需更改
+		// CHANGE 有新的任务,需要杀死旧的数采任务并重启
+		// NONE 设备没有配置任务,需要杀死旧的数采任务
+		// localStatus
+		// idle 空闲状态,此状态下不启动数采任务
+		// running 繁忙状态,此状态需要启动数采任务
+		// error 错误状态,此状态下不启动数采任务
 
-			reply := 0
-			if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
-				c_log.GlobalLogger.Error("发送 rpc 请求到 master 报错:", err)
-				// 这里可能会报错unexpected EOF但是不影响,先注释 close 和 continue
-				//KillRpcClient.Close()
-				//continue
+		// 本地任务状态负责启停master
+		if localStatus == "running" {
+			if !launchedFlag { // 目前未启动数采程序
+				c_log.GlobalLogger.Info("数采程序启动 - 开始。")
+				startMasterNode()
 			}
-			lastStatus = status
-			c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
-			commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
-			if err = KillRpcClient.Close(); err != nil {
-				// 不做处理
+		} else if localStatus == "idle" {
+			if !renewedFlag && launchedFlag && len(entity.TimeWindowConsumerQueue) == 0 {
+				time.Sleep(time.Duration(waitStopLength) * time.Minute)
+				c_log.GlobalLogger.Info("设备不在运行状态且没有待处理的数据,数采程序关闭 - 开始。")
+				stopMasterNode()
 			}
+		} else if localStatus == "error" {
+			c_log.GlobalLogger.Error("设备运行状态出错,停止数采程序。")
+			stopMasterNode()
 		} else {
-			c_log.GlobalLogger.Error("未知的采集任务状态。【status】=", status)
+			c_log.GlobalLogger.Error("未知的设备运行状态。【status】=", localStatus)
 		}
 	}
 }

+ 45 - 0
aarch64/pjibot_delivery/control/pkg/judge_cloud.go

@@ -0,0 +1,45 @@
+package pkg
+
+import (
+	commonConfig "cicv-data-closedloop/aarch64/pjibot_delivery/common/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"time"
+)
+
+var (
+	maxRetryCount = 10
+)
+
+// UN_CHANGE 没有新的任务
+// CHANGE 有新的任务
+// NONE 设备没有配置任务
+func GetCloudStatus(cloudStatus *string, lastCloudStatus *string, turnLength int) {
+	// 轮询云端任务状态
+	for {
+		time.Sleep(time.Duration(turnLength) * time.Second)
+
+		*lastCloudStatus = *cloudStatus
+		taskStatus, err := commonConfig.GetStatus(commonConfig.PlatformConfig.TaskConfigId)
+		if err != nil {
+			c_log.GlobalLogger.Error("获取云端配置status失败:", err)
+			continue
+		}
+		if taskStatus == "" || taskStatus == " " {
+			taskStatus = "NONE"
+		}
+		*cloudStatus = taskStatus
+	}
+}
+
+func GetCloudConfig(cloudStatus *string, lastCloudStatus *string, turnLength int) {
+	for {
+		time.Sleep(time.Duration(turnLength) * time.Second)
+		if *cloudStatus == "CHANGE" {
+			c_log.GlobalLogger.Error("cloudStatus:", *cloudStatus)
+			if commonConfig.CheckPlatformConfigStatus(maxRetryCount) {
+				c_log.GlobalLogger.Info("查询到数据闭环平台有配置任务。")
+				commonConfig.InitPlatformConfig()
+			}
+		}
+	}
+}

+ 44 - 0
aarch64/pjibot_delivery/control/pkg/judge_local.go

@@ -0,0 +1,44 @@
+package pkg
+
+import (
+	"cicv-data-closedloop/aarch64/pjibot_delivery/common/config"
+	"encoding/json"
+	"fmt"
+	"log"
+	"time"
+)
+
+// idle 空闲状态,此状态下机器人可进行任务下发
+// running 繁忙状态,此状态机器人不接受新任务
+// error 错误状态(硬件,不能正常工作的)
+func GetLocalStatus(localStatus *string, lastLocalStatus *string, turnLength int) {
+	defer config.WsConn.Close()
+	// 轮询本地任务状态
+	for {
+		time.Sleep(time.Duration(turnLength) * time.Second)
+
+		*lastLocalStatus = *localStatus
+		_, msg, err := config.WsConn.ReadMessage()
+		if err != nil {
+			log.Println("Error in receive:", err)
+			continue
+		}
+		//log.Printf("Received: %s\n", msg)
+
+		// 将响应字节解码为JSON
+		var statusMessage config.StatusMessage
+		err = json.Unmarshal(msg, &statusMessage)
+		if err != nil {
+			log.Println("Error in json:", err)
+			continue
+		}
+
+		if statusMessage.Type == "push" && statusMessage.Topic == "robotStatus" {
+			//fmt.Println("statusMessage:", statusMessage)
+			data := statusMessage.Data.(map[string]interface{})
+			//fmt.Println("statusMessage.Data", data)
+			fmt.Println("statusMessage.Data[\"status\"]", data["status"])
+			*localStatus = data["status"].(string)
+		}
+	}
+}

+ 5 - 0
aarch64/pjibot_delivery/配送机器人默认配置文件-cloud-config.yaml

@@ -6,6 +6,11 @@ collect-limit:
   week: 7
   month: 30
   year: 365
+collect-window:
+  flag: 1 # 时间段限制标志 0 - 关闭固定时间段采集 1 - 开启固定时间段采集
+  days: [] # 可选字段:"Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday",空值默认为每天
+  start_time: "09:00"
+  end_time: "17:00"
 collect-num-plus:
   url: http://36.110.106.142:12341/web_server/collect_limit/plus_collect_num
 monitor:

+ 0 - 0
tools/pji_api/main/main.go → tools/pji/pji_api/main/main.go


+ 85 - 0
tools/pji/pji_time/main/main.go

@@ -0,0 +1,85 @@
+package main
+
+import (
+	"fmt"
+	"time"
+)
+
+type CollectWindowStruct struct {
+	Flag      int      `yaml:"flag"`
+	Days      []string `yaml:"days,omitempty"`
+	Start     string   `yaml:"start_time"`
+	End       string   `yaml:"end_time"`
+	StartTime time.Time
+	EndTime   time.Time
+}
+
+func IsTimeAllowed(currentTime time.Time) bool {
+	// 模拟解析数据
+	cw := CollectWindowStruct{
+		Flag:  1,
+		Days:  []string{},
+		Start: "09:00",
+		End:   "17:00",
+	}
+	// 单独解析采集时间
+	startTime, err := time.Parse("11:11", cw.Start)
+	if err != nil {
+		fmt.Println("云端配置文件解析采集时间【startTime】失败 ", err, "取默认值【00:00】")
+		cw.StartTime, _ = time.Parse("11:11", "00:00")
+	}
+	cw.StartTime = startTime
+	endTime, err := time.Parse("11:11", cw.End)
+	if err != nil {
+		fmt.Println("云端配置文件解析采集时间【endTime】失败 ", err, "取默认值【23:59】")
+		cw.EndTime, _ = time.Parse("11:11", "23:59")
+	}
+	cw.EndTime = endTime
+	if len(cw.Days) == 0 {
+		// 默认设置为每天
+		cw.Days = []string{"Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"}
+	}
+
+	// todo 待测逻辑
+	if cw.Flag == 0 { // 关闭固定时间段采集, 则都返回true
+		fmt.Println("已关闭采集时间段限制")
+		return true
+	}
+	if len(cw.Days) > 0 { // 如果指定了周几
+		currentDay := currentTime.Weekday().String()
+		fmt.Println("currentDay", currentDay)
+		included := false
+		for _, day := range cw.Days {
+			if day == currentDay {
+				fmt.Println("当前时间符合规定的日期要求")
+				included = true
+				break
+			}
+		}
+		if !included {
+			fmt.Println("当前时间不符合规定的日期要求")
+			return false
+		}
+	}
+
+	start := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), cw.StartTime.Hour(), cw.StartTime.Minute(), cw.StartTime.Second(), 0, currentTime.Location())
+	end := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), cw.EndTime.Hour(), cw.EndTime.Minute(), cw.EndTime.Second(), 0, currentTime.Location())
+
+	fmt.Println("start", start)
+	fmt.Println("end", end)
+
+	if start.After(end) { // 如果时段跨天
+		end = end.AddDate(0, 0, 1)
+		fmt.Println("当前时间设置的时间段跨天,更正end时间", end)
+	}
+	return !currentTime.Before(start) && currentTime.Before(end)
+}
+
+func main() {
+	flag := IsTimeAllowed(time.Now())
+	if flag {
+		fmt.Println("当前时间段符合规定,允许采集")
+	} else {
+		fmt.Println("当前时间段不符合规定,不允许采集")
+	}
+}

+ 0 - 0
tools/pji_websocket/main/main.go → tools/pji/pji_websocket/main/main.go