Sfoglia il codice sorgente

refactor: Modify the master start-stop logic

HeWang 7 mesi fa
parent
commit
f5630d4c67

+ 82 - 46
aarch64/pjibot_guide/control/main.go

@@ -6,6 +6,7 @@ import (
 	"cicv-data-closedloop/aarch64/pjibot_guide/common/variable"
 	"cicv-data-closedloop/aarch64/pjibot_guide/control/pkg"
 	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
 	"fmt"
 	"net/rpc"
@@ -21,7 +22,10 @@ var (
 	lastLocalStatus = "idle"
 	lastCloudStatus = "NONE"
 	launchedFlag    = false
+	renewedFlag     = false
 	configuredFlag  = false
+	renewTimer      *time.Timer // 续约定时器
+	RenewDur        = 5         // 续约时间为5分钟
 )
 
 func init() {
@@ -40,12 +44,64 @@ func init() {
 	commonConfig.InitWebsocketConfig()
 }
 
+func renew() {
+	if renewTimer != nil {
+		renewTimer.Stop()
+	}
+	renewedFlag = true
+	renewTimer = time.AfterFunc(time.Duration(RenewDur)*time.Minute, func() {
+		renewedFlag = false
+	})
+	c_log.GlobalLogger.Infof("续约时间【%v】分钟 - 成功。", RenewDur)
+}
+
+func startMasterNode() {
+	if _, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...); err != nil {
+		c_log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
+		os.Exit(-1)
+	}
+	c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args)
+	c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
+	commonConfig.InitPlatformConfig()
+
+	launchedFlag = true
+	c_log.GlobalLogger.Info("数采程序启动 - 成功。")
+}
+
+func stopMasterNode() {
+	// 发送rpc信号杀死采集程序
+	var killArgs commonService.KillSignal
+	killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: false}
+	c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号:", killArgs)
+	KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
+	if err != nil {
+		// 此处如果连接失败说明采集程序已经停止了
+		lastCloudStatus = "NONE"
+		c_log.GlobalLogger.Error("采集程序已经停止:", err)
+		return
+	}
+
+	reply := 0
+	if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
+		c_log.GlobalLogger.Error("发送 rpc 请求到 master 报错:", err)
+		// 这里可能会报错unexpected EOF但是不影响,先注释 close 和 continue
+		//KillRpcClient.Close()
+		//continue
+	}
+
+	c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
+	commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
+	if err = KillRpcClient.Close(); err != nil {
+		// 不做处理
+	}
+
+	launchedFlag = false
+	c_log.GlobalLogger.Info("数采程序关闭 - 成功。")
+}
+
 func main() {
 	localTurnLength := 2  // s
 	cloudTurnLength := 60 // s
-	//configTurnLength := 60 // s
-	overallTurnLength := localTurnLength
-	stopTime := 10 // min
 
 	// 更新本地任务状态
 	go pkg.GetLocalStatus(&localStatus, &lastLocalStatus, localTurnLength)
@@ -53,7 +109,12 @@ func main() {
 	go pkg.GetCloudStatus(&cloudStatus, &lastCloudStatus, cloudTurnLength)
 
 	for {
-		time.Sleep(time.Duration(overallTurnLength) * time.Second)
+		if launchedFlag { // 当前已启动master节点
+			time.Sleep(time.Duration(cloudTurnLength) * time.Second)
+		} else {
+			time.Sleep(time.Duration(localTurnLength) * time.Second)
+		}
+
 		fmt.Println("localStatus: ", localStatus, "lastLocalStatus: ", lastLocalStatus)
 		fmt.Println("cloudStatus: ", cloudStatus, "lastCloudStatus: ", lastCloudStatus)
 
@@ -70,54 +131,29 @@ func main() {
 		// 本地任务状态负责启停master
 		if localStatus == "running" {
 			if !launchedFlag { // 目前未启动数采程序
-				if _, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...); err != nil {
-					c_log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
-					os.Exit(-1)
-				}
-				c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args)
-				c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
-				commonConfig.InitPlatformConfig()
-				launchedFlag = true
+				c_log.GlobalLogger.Info("数采程序启动 - 开始。")
+				startMasterNode()
+			} else { // 已启动数采程序
+				c_log.GlobalLogger.Info("设备仍处于运行状态,续约 - 开始。")
+				// 续约
+				renew()
 			}
 			continue
-		}
-
-		if lastLocalStatus == "running" && localStatus == "idle" {
-			if launchedFlag {
-				// 等待数据上传(如果有触发)
-				time.Sleep(time.Duration(stopTime) * time.Minute)
-				// 发送rpc信号杀死采集程序
-				var killArgs commonService.KillSignal
-				killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: false}
-				c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号:", killArgs)
-				KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
-				if err != nil {
-					// 此处如果连接失败说明采集程序已经停止了
-					lastCloudStatus = "NONE"
-					c_log.GlobalLogger.Error("采集程序已经停止:", err)
-					continue
-				}
-
-				reply := 0
-				if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
-					c_log.GlobalLogger.Error("发送 rpc 请求到 master 报错:", err)
-					// 这里可能会报错unexpected EOF但是不影响,先注释 close 和 continue
-					//KillRpcClient.Close()
-					//continue
-				}
-
-				c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
-				commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
-				if err = KillRpcClient.Close(); err != nil {
-					// 不做处理
-				}
-				launchedFlag = false
+		} else if localStatus == "idle" {
+			if !renewedFlag && launchedFlag && len(entity.TimeWindowConsumerQueue) == 0 {
+				c_log.GlobalLogger.Info("设备不在运行状态且没有待处理的数据,数采程序关闭 - 开始。")
+				stopMasterNode()
 			}
 			continue
+		} else if localStatus == "error" {
+			c_log.GlobalLogger.Error("设备运行状态出错,停止数采程序。")
+			stopMasterNode()
+		} else {
+			c_log.GlobalLogger.Error("未知的设备运行状态。【status】=", localStatus)
 		}
 
-		//// 云端任务状态负责更新配置
-		//go pkg.GetCloudConfig(cloudStatus, lastCloudStatus, configTurnLength)
+		// 云端任务状态负责更新配置
+		go pkg.GetCloudConfig(cloudStatus, lastCloudStatus, cloudTurnLength)
 
 		//if cloudStatus == "UN_CHANGE" {
 		//	continue

+ 1 - 1
aarch64/pjibot_guide/control/pkg/judge_cloud.go

@@ -30,7 +30,7 @@ func GetCloudStatus(cloudStatus *string, lastCloudStatus *string, turnLength int
 func GetCloudConfig(cloudStatus string, lastCloudStatus string, turnLength int) {
 	for {
 		time.Sleep(time.Duration(turnLength) * time.Second)
-		if (lastCloudStatus == "NONE" || lastCloudStatus == "UN_CHANGE") && cloudStatus == "CHANGE" {
+		if cloudStatus == "CHANGE" {
 			commonConfig.InitPlatformConfig()
 		}
 	}