Переглянути джерело

Merge remote-tracking branch 'origin/master'

LingxinMeng 6 місяців тому
батько
коміт
dfba53e00f

+ 2 - 2
aarch64/pjibot_delivery/配送机器人默认配置文件-cloud-config.yaml

@@ -63,7 +63,7 @@ hosts:
         - "LD_LIBRARY_PATH=/opt/ros/noetic/lib:/opt/ros/noetic/lib/aarch64-linux-gnu"
         - "ROS_ROOT=/opt/ros/noetic/share/ros"
         - "ROS_DISTRO=noetic"
-    topics: # /robot_pose,/robot/realtime_cost_map_,/tracking/objects,/robot/TaskInfo,/robot/targetposition,/wheel,/wheel_odom,/robot/global_trajectory_,/robot/target_trajectories,/robot/evaluator_trajectories,/robot/final_trajectory,/nav/task_feedback_info,/cmd_vel,/imu,/nav/task_feedback_info,/velodyne_points
+    topics: # 这里加完了新话题还需要在数据闭环平台修改设备信息 # /robot_pose,/robot/realtime_cost_map_,/tracking/objects,/robot/TaskInfo,/robot/targetposition,/wheel,/wheel_odom,/robot/global_trajectory_,/robot/target_trajectories,/robot/evaluator_trajectories,/robot/final_trajectory,/nav/task_feedback_info,/cmd_vel,/imu,/nav/task_feedback_info,/velodyne_points
       - /robot_pose
       - /robot/realtime_cost_map_
       - /tracking/objects
@@ -75,7 +75,7 @@ hosts:
       - /robot/target_trajectories
       - /robot/evaluator_trajectories
       - /robot/final_trajectory
-      - /nav/task_feedback_info
+      - /nav/task_feedback_info·
       - /cmd_vel
       - /imu
       - /points_cluster

+ 66 - 70
aarch64/pjibot_guide/common/config/c_cloud.go

@@ -3,12 +3,8 @@ package config
 import (
 	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
-	"encoding/json"
 	"errors"
-	"fmt"
-	"github.com/gorilla/websocket"
 	"gopkg.in/yaml.v3"
-	"net/url"
 	"os"
 	"strings"
 	"sync"
@@ -88,24 +84,24 @@ type cloudConfig struct {
 	Monitor               MonitorStruct      `yaml:"monitor"`
 }
 
-// Request 结构体定义
-type Request struct {
-	Type      string      `json:"type"`
-	UUID      string      `json:"uuid"`
-	CommandID string      `json:"commandId"`
-	Parameter interface{} `json:"parameter"`
-}
-
-// Response 结构体定义
-type Response struct {
-	CommandID string            `json:"commandId"`
-	ErrorCode string            `json:"errorCode"`
-	Results   map[string]string `json:"results"`
-	Status    string            `json:"status"`
-	Time      int64             `json:"time"`
-	Type      string            `json:"type"`
-	UUID      string            `json:"uuid"`
-}
+//// Request 结构体定义
+//type Request struct {
+//	Type      string      `json:"type"`
+//	UUID      string      `json:"uuid"`
+//	CommandID string      `json:"commandId"`
+//	Parameter interface{} `json:"parameter"`
+//}
+//
+//// Response 结构体定义
+//type Response struct {
+//	CommandID string            `json:"commandId"`
+//	ErrorCode string            `json:"errorCode"`
+//	Results   map[string]string `json:"results"`
+//	Status    string            `json:"status"`
+//	Time      int64             `json:"time"`
+//	Type      string            `json:"type"`
+//	UUID      string            `json:"uuid"`
+//}
 
 var (
 	CloudConfig      cloudConfig
@@ -269,51 +265,51 @@ func getSnCode() (string, error) {
 	return snCode, nil
 }
 
-// SendWebsocketRequest 发送WebSocket请求并返回sn字段的值
-func SendWebsocketRequest(serverURL, path string, request Request) (string, error) {
-	// 构建WebSocket连接URL
-	u := url.URL{Scheme: "ws", Host: serverURL, Path: path}
-
-	// 创建一个Dialer实例,用于建立WebSocket连接
-	dialer := websocket.Dialer{
-		ReadBufferSize:  1024,
-		WriteBufferSize: 1024,
-		// 可选:设置超时等
-		HandshakeTimeout: 5 * time.Second,
-	}
-
-	// 建立WebSocket连接
-	conn, _, err := dialer.Dial(u.String(), nil)
-	if err != nil {
-		return "", fmt.Errorf("dial: %w", err)
-	}
-	defer conn.Close()
-
-	// 将请求JSON编码为字节
-	requestJSON, err := json.Marshal(request)
-	if err != nil {
-		return "", fmt.Errorf("marshal request: %w", err)
-	}
-
-	// 发送WebSocket消息
-	err = conn.WriteMessage(websocket.TextMessage, requestJSON)
-	if err != nil {
-		return "", fmt.Errorf("write: %w", err)
-	}
-
-	// 读取WebSocket响应
-	_, responseBytes, err := conn.ReadMessage()
-	if err != nil {
-		return "", fmt.Errorf("read: %w", err)
-	}
-
-	// 将响应字节解码为JSON
-	var response Response
-	err = json.Unmarshal(responseBytes, &response)
-	if err != nil {
-		return "", fmt.Errorf("unmarshal response: %w", err)
-	}
-
-	// 返回sn字段的值
-	return response.Results["sn"], nil
-}
+//// SendWebsocketRequest 发送WebSocket请求并返回sn字段的值
+//func SendWebsocketRequest(serverURL, path string, request Request) (string, error) {
+//	// 构建WebSocket连接URL
+//	u := url.URL{Scheme: "ws", Host: serverURL, Path: path}
+//
+//	// 创建一个Dialer实例,用于建立WebSocket连接
+//	dialer := websocket.Dialer{
+//		ReadBufferSize:  1024,
+//		WriteBufferSize: 1024,
+//		// 可选:设置超时等
+//		HandshakeTimeout: 5 * time.Second,
+//	}
+//
+//	// 建立WebSocket连接
+//	conn, _, err := dialer.Dial(u.String(), nil)
+//	if err != nil {
+//		return "", fmt.Errorf("dial: %w", err)
+//	}
+//	defer conn.Close()
+//
+//	// 将请求JSON编码为字节
+//	requestJSON, err := json.Marshal(request)
+//	if err != nil {
+//		return "", fmt.Errorf("marshal request: %w", err)
+//	}
+//
+//	// 发送WebSocket消息
+//	err = conn.WriteMessage(websocket.TextMessage, requestJSON)
+//	if err != nil {
+//		return "", fmt.Errorf("write: %w", err)
+//	}
+//
+//	// 读取WebSocket响应
+//	_, responseBytes, err := conn.ReadMessage()
+//	if err != nil {
+//		return "", fmt.Errorf("read: %w", err)
+//	}
+//
+//	// 将响应字节解码为JSON
+//	var response Response
+//	err = json.Unmarshal(responseBytes, &response)
+//	if err != nil {
+//		return "", fmt.Errorf("unmarshal response: %w", err)
+//	}
+//
+//	// 返回sn字段的值
+//	return response.Results["sn"], nil
+//}

+ 1 - 0
aarch64/pjibot_guide/common/config/c_local.go

@@ -24,6 +24,7 @@ type localConfig struct {
 	OssBasePrefix        string     `yaml:"oss-base-prefix"`         // 云端配置文件的位置
 	CloudConfigFilename  string     `yaml:"cloud-config-filename"`   // 云端配置文件名称
 	CloudConfigLocalPath string     `yaml:"cloud-config-local-path"` // 将 oss 的配置文件下载到本地的位置
+	LocalWebsocketPort   string     `yaml:"local-websocket-port"`    // websocket端口号
 	RestartCmd           restartCmd `yaml:"restart-cmd"`             // 重启命令
 	EquipmentNo          string     // 当前设备的编号
 	SecretKey            string     // 当前设备的密钥

+ 1 - 1
aarch64/pjibot_guide/common/config/c_platform.go

@@ -41,7 +41,7 @@ var (
 	RecordTopics   []string
 )
 
-// InitPlatformConfig 初始化数据闭环平台的配置
+// 初始化数据闭环平台的配置
 func InitPlatformConfig() {
 	var err error
 	c_log.GlobalLogger.Info("获取数据闭环平台配置 - 开始")

+ 107 - 0
aarch64/pjibot_guide/common/config/c_websocket.go

@@ -0,0 +1,107 @@
+package config
+
+import (
+	"cicv-data-closedloop/common/config/c_log"
+	"fmt"
+	"github.com/gorilla/websocket"
+	"net/url"
+	"time"
+)
+
+var (
+	WsConn                 *websocket.Conn
+	reconnectionInProgress bool
+)
+
+// Request 结构体定义
+type Request struct {
+	Type      string      `json:"type"`
+	UUID      string      `json:"uuid"`
+	CommandID string      `json:"commandId"`
+	Parameter interface{} `json:"parameter"`
+}
+
+// Response 结构体定义
+type Response struct {
+	CommandID string            `json:"commandId"`
+	ErrorCode string            `json:"errorCode"`
+	Results   map[string]string `json:"results"`
+	Status    string            `json:"status"`
+	Time      int64             `json:"time"`
+	Type      string            `json:"type"`
+	UUID      string            `json:"uuid"`
+}
+
+// 状态消息 结构体定义
+type StatusMessage struct {
+	Type  string      `json:"type"`
+	Topic string      `json:"topic"`
+	Time  int64       `json:"time"`
+	Data  interface{} `json:"data"`
+}
+
+func keepAlive() {
+	ticker := time.NewTicker(30 * time.Second)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			if err := WsConn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
+				c_log.GlobalLogger.Error("保持websocket连接活跃,发送ping - 失败。", err)
+				WsConn.Close()
+				c_log.GlobalLogger.Info("重试连接websocket...")
+				InitWebsocketConfig() // 重新连接
+				return
+			}
+		}
+	}
+}
+
+func InitWebsocketConfig() {
+	for {
+		// 防止重复调用
+		if reconnectionInProgress {
+			return
+		}
+		reconnectionInProgress = true
+
+		c_log.GlobalLogger.Info("初始化Websocket连接 - 开始。")
+		serverURL := LocalConfig.Node.Ip + ":" + LocalConfig.LocalWebsocketPort
+		path := "/"
+
+		// 构建WebSocket连接URL
+		u := url.URL{Scheme: "ws", Host: serverURL, Path: path}
+		c_log.GlobalLogger.Info("URL:", u.String())
+
+		// 创建一个Dialer实例,用于建立WebSocket连接
+		dialer := websocket.Dialer{
+			ReadBufferSize:  1024,
+			WriteBufferSize: 1024,
+			// 可选:设置超时等
+			HandshakeTimeout: 5 * time.Second,
+		}
+
+		// 建立WebSocket连接
+		coon, _, err := dialer.Dial(u.String(), nil)
+		if err != nil {
+			fmt.Println("err:", err)
+			c_log.GlobalLogger.Error("初始化Websocket连接 - 失败。")
+			time.Sleep(5 * time.Second)
+			reconnectionInProgress = false
+			c_log.GlobalLogger.Info("重试连接websocket...")
+			continue
+		}
+
+		WsConn = coon
+		c_log.GlobalLogger.Info("初始化Websocket连接 - 成功。")
+
+		// 保持连接活跃
+		//go keepAlive()
+
+		// 连接成功,退出循环
+		reconnectionInProgress = false
+		break
+	}
+
+}

+ 3 - 3
aarch64/pjibot_guide/common/service/kill_self.go

@@ -22,17 +22,17 @@ var (
 	MutexKill   sync.Mutex
 )
 
-// KillSignal 停止信号,主从节点接收到数据后准备重启
+// 停止信号,主从节点接收到数据后准备重启
 type KillSignal struct {
 	NodeName       string
 	DropUploadData bool
 	Restart        bool
 }
 
-// KillService 定义要远程调用的类型和方法
+// 定义要远程调用的类型和方法
 type KillService struct{}
 
-// Kill 杀死自身程序,通过通道实现 方法必须满足RPC规范:函数有两个参数,第一个参数是请求,第二个是响应
+// 杀死自身程序,通过通道实现 方法必须满足RPC规范:函数有两个参数,第一个参数是请求,第二个是响应
 func (m *KillService) Kill(args KillSignal, reply *int) error {
 	c_log.GlobalLogger.Info("接收到自杀信号:", args)
 	// 1 杀死 rosbag record 命令

+ 177 - 79
aarch64/pjibot_guide/control/main.go

@@ -6,14 +6,27 @@ import (
 	"cicv-data-closedloop/aarch64/pjibot_guide/common/variable"
 	"cicv-data-closedloop/aarch64/pjibot_guide/control/pkg"
 	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
+	"fmt"
 	"net/rpc"
 	"os"
 	"runtime"
 	"time"
 )
 
-var applicationName = "pji-control"
+var (
+	applicationName = "pji-control"
+	localStatus     = "idle"
+	cloudStatus     = "NONE"
+	lastLocalStatus = "idle"
+	lastCloudStatus = "NONE"
+	launchedFlag    = false
+	renewedFlag     = false
+	configuredFlag  = false
+	renewTimer      *time.Timer // 续约定时器
+	RenewDur        = 5         // 续约时间为5分钟
+)
 
 func init() {
 	runtime.GOMAXPROCS(1)
@@ -26,97 +39,182 @@ func init() {
 	// 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
 	commonConfig.InitCloudConfig()
 	// 初始化rpc客户端,用于杀死旧的采集程序
+
+	// 初始化websocket配置
+	commonConfig.InitWebsocketConfig()
 }
 
-func main() {
-	go judgeState1()
-	go pkg.JudgeLocal()
-	go judgeState3()
-	select {}
+func renew() {
+	if renewTimer != nil {
+		renewTimer.Stop()
+	}
+	renewedFlag = true
+	renewTimer = time.AfterFunc(time.Duration(RenewDur)*time.Minute, func() {
+		renewedFlag = false
+	})
+	c_log.GlobalLogger.Infof("续约时间【%v】分钟 - 成功。", RenewDur)
+}
+
+func startMasterNode() {
+	c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
+	commonConfig.InitPlatformConfig()
+
+	if _, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...); err != nil {
+		c_log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
+		os.Exit(-1)
+	}
+	c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args)
+
+	launchedFlag = true
+	c_log.GlobalLogger.Info("数采程序启动 - 成功。")
 }
 
-func judgeState1() {
-	init := true
-	turnLength := 60
-	lastStatus := "NONE"
-	//  轮询任务接口判断是否有更新
+func stopMasterNode() {
+	// 发送rpc信号杀死采集程序
+	var killArgs commonService.KillSignal
+	killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: false}
+	c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号:", killArgs)
+	KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
+	if err != nil {
+		// 此处如果连接失败说明采集程序已经停止了
+		lastCloudStatus = "NONE"
+		c_log.GlobalLogger.Error("采集程序已经停止:", err)
+		return
+	}
+
+	reply := 0
+	if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
+		c_log.GlobalLogger.Error("发送 rpc 请求到 master 报错:", err)
+		// 这里可能会报错unexpected EOF但是不影响,先注释 close 和 continue
+		//KillRpcClient.Close()
+		//continue
+	}
+
+	c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
+	commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
+	if err = KillRpcClient.Close(); err != nil {
+		// 不做处理
+	}
+
+	launchedFlag = false
+	c_log.GlobalLogger.Info("数采程序关闭 - 成功。")
+}
+
+func main() {
+	localTurnLength := 2  // s
+	cloudTurnLength := 60 // s
+	waitStopLength := 1   // min
+
+	// 更新本地任务状态
+	go pkg.GetLocalStatus(&localStatus, &lastLocalStatus, localTurnLength)
+	// 更新云端任务状态
+	go pkg.GetCloudStatus(&cloudStatus, &lastCloudStatus, cloudTurnLength)
+
 	for {
-		c_log.GlobalLogger.Errorf("一轮次扫描时间【%v】秒:", turnLength)
-		if init {
-			time.Sleep(time.Duration(1) * time.Second)
-			init = false
+		if launchedFlag { // 当前已启动master节点
+			time.Sleep(time.Duration(cloudTurnLength) * time.Second)
 		} else {
-			time.Sleep(time.Duration(turnLength) * time.Second)
+			time.Sleep(time.Duration(localTurnLength) * time.Second)
 		}
-		// 1 获取当前设备的任务的 status
-		status, err := commonConfig.GetStatus(commonConfig.PlatformConfig.TaskConfigId)
-		if err != nil {
-			c_log.GlobalLogger.Error("获取配置status失败:", err)
-			continue
-		}
-		// 2 判断 status
-		// UN_CHANGE 没有新的任务,无需更改
-		// CHANGE 有新的任务,需要杀死旧的任务并重启
-		// NONE 设备没有配置任务,需要杀死旧的任务
-		if status == "UN_CHANGE" {
-			lastStatus = "UN_CHANGE"
-			continue
-		} else if status == "CHANGE" || status == "NONE" {
-			if lastStatus == "CHANGE" && status == "CHANGE" { // 供更新使用
-				commonConfig.InitPlatformConfig()
-				continue
-			}
-			if lastStatus == "NONE" && status == "NONE" {
-				continue
-			}
-
-			if lastStatus == "NONE" && status == "CHANGE" {
-				if _, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...); err != nil {
-					c_log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
-					os.Exit(-1)
-				}
-				c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args)
-				lastStatus = status
-				c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
-				commonConfig.InitPlatformConfig()
-				continue
-			}
 
-			// 3 发送rpc信号杀死采集程序
-			var killArgs commonService.KillSignal
-			if lastStatus == "UN_CHANGE" && status == "CHANGE" {
-				killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: true}
-				c_log.GlobalLogger.Info("更新任务,发送rpc重启信号:", killArgs)
-			}
-			if lastStatus == "UN_CHANGE" && status == "NONE" {
-				killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: false}
-				c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号:", killArgs)
-			}
+		fmt.Println("localStatus: ", localStatus, "lastLocalStatus: ", lastLocalStatus)
+		fmt.Println("cloudStatus: ", cloudStatus, "lastCloudStatus: ", lastCloudStatus)
 
-			KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
-			if err != nil {
-				// 此处如果连接失败说明采集程序已经停止了
-				lastStatus = "NONE"
-				c_log.GlobalLogger.Error("采集程序已经停止:", err)
-				continue
-			}
+		// 综合判断 cloudStatus 和 localStatus
+		// cloudStatus
+		// UN_CHANGE 没有新的任务,无需更改
+		// CHANGE 有新的任务,需要杀死旧的数采任务并重启
+		// NONE 设备没有配置任务,需要杀死旧的数采任务
+		// localStatus
+		// idle 空闲状态,此状态下不启动数采任务
+		// running 繁忙状态,此状态需要启动数采任务
+		// error 错误状态,此状态下不启动数采任务
 
-			reply := 0
-			if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
-				c_log.GlobalLogger.Error("发送 rpc 请求到 master 报错:", err)
-				// 这里可能会报错unexpected EOF但是不影响,先注释 close 和 continue
-				//KillRpcClient.Close()
-				//continue
+		// 本地任务状态负责启停master
+		if localStatus == "running" {
+			if !launchedFlag { // 目前未启动数采程序
+				c_log.GlobalLogger.Info("数采程序启动 - 开始。")
+				startMasterNode()
+			} else { // 已启动数采程序
+				c_log.GlobalLogger.Info("设备仍处于运行状态,续约 - 开始。")
+				// 续约
+				renew()
 			}
-			lastStatus = status
-			c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
-			commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
-			if err = KillRpcClient.Close(); err != nil {
-				// 不做处理
+			continue
+		} else if localStatus == "idle" {
+			if !renewedFlag && launchedFlag && len(entity.TimeWindowConsumerQueue) == 0 {
+				c_log.GlobalLogger.Info("设备不在运行状态且没有待处理的数据,数采程序关闭 - 开始。")
+				time.Sleep(time.Duration(waitStopLength) * time.Minute)
+				stopMasterNode()
 			}
+			continue
+		} else if localStatus == "error" {
+			c_log.GlobalLogger.Error("设备运行状态出错,停止数采程序。")
+			stopMasterNode()
 		} else {
-			c_log.GlobalLogger.Error("未知的采集任务状态。【status】=", status)
+			c_log.GlobalLogger.Error("未知的设备运行状态。【status】=", localStatus)
 		}
+
+		// 云端任务状态负责更新配置
+		go pkg.GetCloudConfig(cloudStatus, lastCloudStatus, cloudTurnLength)
+
+		//if cloudStatus == "UN_CHANGE" {
+		//	continue
+		//} else if cloudStatus == "CHANGE" || cloudStatus == "NONE" {
+		//	if lastCloudStatus == "CHANGE" && cloudStatus == "CHANGE" { // 供更新使用
+		//		commonConfig.InitPlatformConfig()
+		//		continue
+		//	}
+		//	if lastCloudStatus == "NONE" && cloudStatus == "NONE" {
+		//		continue
+		//	}
+		//
+		//	if lastCloudStatus == "NONE" && cloudStatus == "CHANGE" {
+		//		if _, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...); err != nil {
+		//			c_log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
+		//			os.Exit(-1)
+		//		}
+		//		c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args)
+		//		c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
+		//		commonConfig.InitPlatformConfig()
+		//		continue
+		//	}
+		//
+		//	// 3 发送rpc信号杀死采集程序
+		//	var killArgs commonService.KillSignal
+		//	if lastCloudStatus == "UN_CHANGE" && cloudStatus == "CHANGE" {
+		//		killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: true}
+		//		c_log.GlobalLogger.Info("更新任务,发送rpc重启信号:", killArgs)
+		//	}
+		//	if lastCloudStatus == "UN_CHANGE" && cloudStatus == "NONE" {
+		//		killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: false}
+		//		c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号:", killArgs)
+		//	}
+		//
+		//	KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
+		//	if err != nil {
+		//		// 此处如果连接失败说明采集程序已经停止了
+		//		lastCloudStatus = "NONE"
+		//		c_log.GlobalLogger.Error("采集程序已经停止:", err)
+		//		continue
+		//	}
+		//
+		//	reply := 0
+		//	if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
+		//		c_log.GlobalLogger.Error("发送 rpc 请求到 master 报错:", err)
+		//		// 这里可能会报错unexpected EOF但是不影响,先注释 close 和 continue
+		//		//KillRpcClient.Close()
+		//		//continue
+		//	}
+		//
+		//	c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
+		//	commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
+		//	if err = KillRpcClient.Close(); err != nil {
+		//		// 不做处理
+		//	}
+		//} else {
+		//	c_log.GlobalLogger.Error("未知的采集任务状态。【cloudStatus】=", cloudStatus)
+		//}
 	}
+	//select {}
 }
-func judgeState3() {}

+ 36 - 0
aarch64/pjibot_guide/control/pkg/judge_cloud.go

@@ -1 +1,37 @@
 package pkg
+
+import (
+	commonConfig "cicv-data-closedloop/aarch64/pjibot_guide/common/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"time"
+)
+
+// UN_CHANGE 没有新的任务
+// CHANGE 有新的任务
+// NONE 设备没有配置任务
+func GetCloudStatus(cloudStatus *string, lastCloudStatus *string, turnLength int) {
+	// 轮询云端任务状态
+	for {
+		time.Sleep(time.Duration(turnLength) * time.Second)
+
+		*lastCloudStatus = *cloudStatus
+		taskStatus, err := commonConfig.GetStatus(commonConfig.PlatformConfig.TaskConfigId)
+		if err != nil {
+			c_log.GlobalLogger.Error("获取云端配置status失败:", err)
+			continue
+		}
+		if taskStatus == "" || taskStatus == " " {
+			taskStatus = "NONE"
+		}
+		*cloudStatus = taskStatus
+	}
+}
+
+func GetCloudConfig(cloudStatus string, lastCloudStatus string, turnLength int) {
+	for {
+		time.Sleep(time.Duration(turnLength) * time.Second)
+		if cloudStatus == "CHANGE" {
+			commonConfig.InitPlatformConfig()
+		}
+	}
+}

+ 40 - 1
aarch64/pjibot_guide/control/pkg/judge_local.go

@@ -1,3 +1,42 @@
 package pkg
 
-func JudgeLocal() {}
+import (
+	"cicv-data-closedloop/aarch64/pjibot_guide/common/config"
+	"encoding/json"
+	"fmt"
+	"log"
+	"time"
+)
+
+// idle 空闲状态,此状态下机器人可进行任务下发
+// running 繁忙状态,此状态机器人不接受新任务
+// error 错误状态(硬件,不能正常工作的)
+func GetLocalStatus(localStatus *string, lastLocalStatus *string, turnLength int) {
+	// 轮询本地任务状态
+	for {
+		time.Sleep(time.Duration(turnLength) * time.Second)
+
+		*lastLocalStatus = *localStatus
+		_, msg, err := config.WsConn.ReadMessage()
+		if err != nil {
+			log.Println("Error in receive:", err)
+			break
+		}
+		//log.Printf("Received: %s\n", msg)
+
+		// 将响应字节解码为JSON
+		var statusMessage config.StatusMessage
+		err = json.Unmarshal(msg, &statusMessage)
+		if err != nil {
+			log.Println("Error in receive:", err)
+			break
+		}
+		if statusMessage.Type == "push" && statusMessage.Topic == "robotStatus" {
+			//fmt.Println("statusMessage:", statusMessage)
+			data := statusMessage.Data.(map[string]interface{})
+			//fmt.Println("statusMessage.Data", data)
+			fmt.Println("statusMessage.Data[\"status\"]", data["status"])
+			*localStatus = data["status"].(string)
+		}
+	}
+}

+ 2 - 0
aarch64/pjibot_guide/引导机器人默认配置文件-local-config.yaml

@@ -11,6 +11,8 @@ oss-base-prefix: pji-double-camera/
 cloud-config-filename: cloud-config.yaml
 # 将oss上的配置文件下载到本地的路径
 cloud-config-local-path: /root/cicv-data-closedloop/config/cloud-config.yaml
+# websocket端口号
+local-websocket-port: 9002
 restart-cmd:
   dir: "/root/cicv-data-closedloop/"
   name: "sh"

+ 1 - 0
aarch64/pjisuv/金龙车-cloud-config.yaml

@@ -61,6 +61,7 @@ hosts:
         - "ROS_DISTRO=noetic"
         - "_=/usr/bin/env"
     topics:
+      - /fault_info
       - /cam_3D # 感知
       - /car_wheel
       - /ch64w/rear

+ 23 - 1
tools/pji_api/main/main.go

@@ -29,6 +29,14 @@ type Response struct {
 	UUID      string            `json:"uuid"`
 }
 
+// 状态消息 结构体定义
+type StatusMessage struct {
+	Type  string      `json:"type"`
+	Topic string      `json:"topic"`
+	Time  int64       `json:"time"`
+	Data  interface{} `json:"data"`
+}
+
 // SendWebsocketRequest 发送WebSocket请求并返回sn字段的值
 func SendWebsocketRequest(serverURL, path string, request Request) (string, error) {
 	// 构建WebSocket连接URL
@@ -129,12 +137,26 @@ func SendWebsocketRequest(serverURL, path string, request Request) (string, erro
 
 func receiveHandler(connection *websocket.Conn) {
 	for {
+		time.Sleep(1 * time.Second)
 		_, msg, err := connection.ReadMessage()
 		if err != nil {
 			log.Println("Error in receive:", err)
-			return
+			break
 		}
 		log.Printf("Received: %s\n", msg)
+		// 将响应字节解码为JSON
+		var statusMessage StatusMessage
+		err = json.Unmarshal(msg, &statusMessage)
+		if err != nil {
+			log.Println("Error in receive:", err)
+			break
+		}
+		fmt.Println("statusMessage:", statusMessage)
+		if statusMessage.Type == "push" && statusMessage.Topic == "robotStatus" {
+			data := statusMessage.Data.(map[string]interface{})
+			fmt.Println("statusMessage.Data", data)
+			fmt.Println("statusMessage.Data[\"taskStatus\"]", data["taskStatus"])
+		}
 	}
 }