Эх сурвалжийг харах

refactor: Modify the master start-stop logic

HeWang 7 сар өмнө
parent
commit
e52a8c429f

+ 118 - 80
aarch64/pjibot_guide/control/main.go

@@ -18,6 +18,9 @@ var (
 	applicationName = "pji-control"
 	localStatus     = "idle"
 	cloudStatus     = "NONE"
+	lastLocalStatus = "idle"
+	lastCloudStatus = "NONE"
+	launchedFlag    = false
 )
 
 func init() {
@@ -37,108 +40,143 @@ func init() {
 }
 
 func main() {
-	localTurnLength := 2
-	cloudTurnLength := 60
+	localTurnLength := 2  // s
+	cloudTurnLength := 60 // s
 	overallTurnLength := localTurnLength
-	//lastCloudStatus := "NONE"
+	stopTimeWindow := 10 // min
 
-	go pkg.GetLocalStatus(&localStatus, localTurnLength)
-	go pkg.GetCloudStatus(&cloudStatus, cloudTurnLength)
+	// 更新本地任务状态
+	go pkg.GetLocalStatus(&localStatus, &lastLocalStatus, localTurnLength)
+	// 更新云端任务状态
+	go pkg.GetCloudStatus(&cloudStatus, &lastCloudStatus, cloudTurnLength)
 
 	for {
 		time.Sleep(time.Duration(overallTurnLength) * time.Second)
-		fmt.Println("localStatus: ", localStatus)
-		fmt.Println("cloudStatus: ", cloudStatus)
-		//if cloudStatus == "UN_CHANGE" {
-		//	lastCloudStatus = "UN_CHANGE"
-		//	continue
-		//}
-	}
-	//select {}
-}
+		fmt.Println("localStatus: ", localStatus, "lastLocalStatus: ", lastLocalStatus)
+		fmt.Println("cloudStatus: ", cloudStatus, "lastCloudStatus: ", lastCloudStatus)
 
-func judgeState1() {
-	init := true
-	turnLength := 60
-	lastStatus := "NONE"
-	//  轮询任务接口判断是否有更新
-	for {
-		c_log.GlobalLogger.Errorf("一轮次扫描时间【%v】秒:", turnLength)
-		if init {
-			time.Sleep(time.Duration(1) * time.Second)
-			init = false
-		} else {
-			time.Sleep(time.Duration(turnLength) * time.Second)
-		}
-		// 1 获取当前设备的任务的 status
-		status, err := commonConfig.GetStatus(commonConfig.PlatformConfig.TaskConfigId)
-		if err != nil {
-			c_log.GlobalLogger.Error("获取配置status失败:", err)
-			continue
-		}
-		// 2 判断 status
+		// 综合判断 cloudStatus 和 localStatus
+		// cloudStatus
 		// UN_CHANGE 没有新的任务,无需更改
-		// CHANGE 有新的任务,需要杀死旧的任务并重启
-		// NONE 设备没有配置任务,需要杀死旧的任务
-		if status == "UN_CHANGE" {
-			lastStatus = "UN_CHANGE"
-			continue
-		} else if status == "CHANGE" || status == "NONE" {
-			if lastStatus == "CHANGE" && status == "CHANGE" { // 供更新使用
-				commonConfig.InitPlatformConfig()
-				continue
-			}
-			if lastStatus == "NONE" && status == "NONE" {
-				continue
-			}
+		// CHANGE 有新的任务,需要杀死旧的数采任务并重启
+		// NONE 设备没有配置任务,需要杀死旧的数采任务
+		// localStatus
+		// idle 空闲状态,此状态下不启动数采任务
+		// running 繁忙状态,此状态需要启动数采任务
+		// error 错误状态,此状态下不启动数采任务
 
-			if lastStatus == "NONE" && status == "CHANGE" {
+		// 本地任务状态负责启停master
+		if lastLocalStatus == "idle" && localStatus == "running" {
+			if !launchedFlag { // 目前未启动数采程序
 				if _, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...); err != nil {
 					c_log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
 					os.Exit(-1)
 				}
 				c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args)
-				lastStatus = status
 				c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
 				commonConfig.InitPlatformConfig()
-				continue
+				launchedFlag = true
 			}
+			continue
+		}
 
-			// 3 发送rpc信号杀死采集程序
-			var killArgs commonService.KillSignal
-			if lastStatus == "UN_CHANGE" && status == "CHANGE" {
-				killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: true}
-				c_log.GlobalLogger.Info("更新任务,发送rpc重启信号:", killArgs)
-			}
-			if lastStatus == "UN_CHANGE" && status == "NONE" {
+		if lastLocalStatus == "running" && localStatus == "idle" {
+			if launchedFlag {
+				// 等待数据上传(如果有触发)
+				time.Sleep(time.Duration(stopTimeWindow) * time.Minute)
+				// 发送rpc信号杀死采集程序
+				var killArgs commonService.KillSignal
 				killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: false}
 				c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号:", killArgs)
-			}
+				KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
+				if err != nil {
+					// 此处如果连接失败说明采集程序已经停止了
+					lastCloudStatus = "NONE"
+					c_log.GlobalLogger.Error("采集程序已经停止:", err)
+					continue
+				}
 
-			KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
-			if err != nil {
-				// 此处如果连接失败说明采集程序已经停止了
-				lastStatus = "NONE"
-				c_log.GlobalLogger.Error("采集程序已经停止:", err)
-				continue
-			}
+				reply := 0
+				if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
+					c_log.GlobalLogger.Error("发送 rpc 请求到 master 报错:", err)
+					// 这里可能会报错unexpected EOF但是不影响,先注释 close 和 continue
+					//KillRpcClient.Close()
+					//continue
+				}
 
-			reply := 0
-			if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
-				c_log.GlobalLogger.Error("发送 rpc 请求到 master 报错:", err)
-				// 这里可能会报错unexpected EOF但是不影响,先注释 close 和 continue
-				//KillRpcClient.Close()
-				//continue
-			}
-			lastStatus = status
-			c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
-			commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
-			if err = KillRpcClient.Close(); err != nil {
-				// 不做处理
+				c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
+				commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
+				if err = KillRpcClient.Close(); err != nil {
+					// 不做处理
+				}
+				launchedFlag = false
 			}
-		} else {
-			c_log.GlobalLogger.Error("未知的采集任务状态。【status】=", status)
+			continue
 		}
+
+		// 云端任务状态负责更新配置
+		if cloudStatus == "CHANGE" {
+			commonConfig.InitPlatformConfig()
+			continue
+		}
+
+		//if cloudStatus == "UN_CHANGE" {
+		//	continue
+		//} else if cloudStatus == "CHANGE" || cloudStatus == "NONE" {
+		//	if lastCloudStatus == "CHANGE" && cloudStatus == "CHANGE" { // 供更新使用
+		//		commonConfig.InitPlatformConfig()
+		//		continue
+		//	}
+		//	if lastCloudStatus == "NONE" && cloudStatus == "NONE" {
+		//		continue
+		//	}
+		//
+		//	if lastCloudStatus == "NONE" && cloudStatus == "CHANGE" {
+		//		if _, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...); err != nil {
+		//			c_log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
+		//			os.Exit(-1)
+		//		}
+		//		c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args)
+		//		c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
+		//		commonConfig.InitPlatformConfig()
+		//		continue
+		//	}
+		//
+		//	// 3 发送rpc信号杀死采集程序
+		//	var killArgs commonService.KillSignal
+		//	if lastCloudStatus == "UN_CHANGE" && cloudStatus == "CHANGE" {
+		//		killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: true}
+		//		c_log.GlobalLogger.Info("更新任务,发送rpc重启信号:", killArgs)
+		//	}
+		//	if lastCloudStatus == "UN_CHANGE" && cloudStatus == "NONE" {
+		//		killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: false}
+		//		c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号:", killArgs)
+		//	}
+		//
+		//	KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
+		//	if err != nil {
+		//		// 此处如果连接失败说明采集程序已经停止了
+		//		lastCloudStatus = "NONE"
+		//		c_log.GlobalLogger.Error("采集程序已经停止:", err)
+		//		continue
+		//	}
+		//
+		//	reply := 0
+		//	if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
+		//		c_log.GlobalLogger.Error("发送 rpc 请求到 master 报错:", err)
+		//		// 这里可能会报错unexpected EOF但是不影响,先注释 close 和 continue
+		//		//KillRpcClient.Close()
+		//		//continue
+		//	}
+		//
+		//	c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
+		//	commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
+		//	if err = KillRpcClient.Close(); err != nil {
+		//		// 不做处理
+		//	}
+		//} else {
+		//	c_log.GlobalLogger.Error("未知的采集任务状态。【cloudStatus】=", cloudStatus)
+		//}
 	}
+	//select {}
 }
-func judgeState3() {}

+ 4 - 2
aarch64/pjibot_guide/control/pkg/judge_cloud.go

@@ -9,15 +9,17 @@ import (
 // UN_CHANGE 没有新的任务
 // CHANGE 有新的任务
 // NONE 设备没有配置任务
-func GetCloudStatus(status *string, turnLength int) {
+func GetCloudStatus(cloudStatus *string, lastCloudStatus *string, turnLength int) {
 	// 轮询云端任务状态
 	for {
 		time.Sleep(time.Duration(turnLength) * time.Second)
+
+		*lastCloudStatus = *cloudStatus
 		taskStatus, err := commonConfig.GetStatus(commonConfig.PlatformConfig.TaskConfigId)
 		if err != nil {
 			c_log.GlobalLogger.Error("获取云端配置status失败:", err)
 			continue
 		}
-		*status = taskStatus
+		*cloudStatus = taskStatus
 	}
 }

+ 7 - 5
aarch64/pjibot_guide/control/pkg/judge_local.go

@@ -8,13 +8,15 @@ import (
 	"time"
 )
 
-// idle 空闲状态,此状态下可进行任务下发
-// processing 繁忙状态,此状态不接受新任务
+// idle 空闲状态,此状态下机器人可进行任务下发
+// running 繁忙状态,此状态机器人不接受新任务
 // error 错误状态(硬件,不能正常工作的)
-func GetLocalStatus(status *string, turnLength int) {
+func GetLocalStatus(localStatus *string, lastLocalStatus *string, turnLength int) {
 	// 轮询本地任务状态
 	for {
 		time.Sleep(time.Duration(turnLength) * time.Second)
+
+		*lastLocalStatus = *localStatus
 		_, msg, err := config.WsConn.ReadMessage()
 		if err != nil {
 			log.Println("Error in receive:", err)
@@ -33,8 +35,8 @@ func GetLocalStatus(status *string, turnLength int) {
 			fmt.Println("statusMessage:", statusMessage)
 			data := statusMessage.Data.(map[string]interface{})
 			fmt.Println("statusMessage.Data", data)
-			fmt.Println("statusMessage.Data[\"taskStatus\"]", data["status"])
-			*status = data["status"].(string)
+			fmt.Println("statusMessage.Data[\"status\"]", data["status"])
+			*localStatus = data["status"].(string)
 		}
 	}
 }