Browse Source

Merge remote-tracking branch 'origin/master'

LingxinMeng 7 tháng trước cách đây
mục cha
commit
978ef4858e

+ 66 - 70
aarch64/pjibot_guide/common/config/c_cloud.go

@@ -3,12 +3,8 @@ package config
 import (
 	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
-	"encoding/json"
 	"errors"
-	"fmt"
-	"github.com/gorilla/websocket"
 	"gopkg.in/yaml.v3"
-	"net/url"
 	"os"
 	"strings"
 	"sync"
@@ -88,24 +84,24 @@ type cloudConfig struct {
 	Monitor               MonitorStruct      `yaml:"monitor"`
 }
 
-// Request 结构体定义
-type Request struct {
-	Type      string      `json:"type"`
-	UUID      string      `json:"uuid"`
-	CommandID string      `json:"commandId"`
-	Parameter interface{} `json:"parameter"`
-}
-
-// Response 结构体定义
-type Response struct {
-	CommandID string            `json:"commandId"`
-	ErrorCode string            `json:"errorCode"`
-	Results   map[string]string `json:"results"`
-	Status    string            `json:"status"`
-	Time      int64             `json:"time"`
-	Type      string            `json:"type"`
-	UUID      string            `json:"uuid"`
-}
+//// Request 结构体定义
+//type Request struct {
+//	Type      string      `json:"type"`
+//	UUID      string      `json:"uuid"`
+//	CommandID string      `json:"commandId"`
+//	Parameter interface{} `json:"parameter"`
+//}
+//
+//// Response 结构体定义
+//type Response struct {
+//	CommandID string            `json:"commandId"`
+//	ErrorCode string            `json:"errorCode"`
+//	Results   map[string]string `json:"results"`
+//	Status    string            `json:"status"`
+//	Time      int64             `json:"time"`
+//	Type      string            `json:"type"`
+//	UUID      string            `json:"uuid"`
+//}
 
 var (
 	CloudConfig      cloudConfig
@@ -269,51 +265,51 @@ func getSnCode() (string, error) {
 	return snCode, nil
 }
 
-// SendWebsocketRequest 发送WebSocket请求并返回sn字段的值
-func SendWebsocketRequest(serverURL, path string, request Request) (string, error) {
-	// 构建WebSocket连接URL
-	u := url.URL{Scheme: "ws", Host: serverURL, Path: path}
-
-	// 创建一个Dialer实例,用于建立WebSocket连接
-	dialer := websocket.Dialer{
-		ReadBufferSize:  1024,
-		WriteBufferSize: 1024,
-		// 可选:设置超时等
-		HandshakeTimeout: 5 * time.Second,
-	}
-
-	// 建立WebSocket连接
-	conn, _, err := dialer.Dial(u.String(), nil)
-	if err != nil {
-		return "", fmt.Errorf("dial: %w", err)
-	}
-	defer conn.Close()
-
-	// 将请求JSON编码为字节
-	requestJSON, err := json.Marshal(request)
-	if err != nil {
-		return "", fmt.Errorf("marshal request: %w", err)
-	}
-
-	// 发送WebSocket消息
-	err = conn.WriteMessage(websocket.TextMessage, requestJSON)
-	if err != nil {
-		return "", fmt.Errorf("write: %w", err)
-	}
-
-	// 读取WebSocket响应
-	_, responseBytes, err := conn.ReadMessage()
-	if err != nil {
-		return "", fmt.Errorf("read: %w", err)
-	}
-
-	// 将响应字节解码为JSON
-	var response Response
-	err = json.Unmarshal(responseBytes, &response)
-	if err != nil {
-		return "", fmt.Errorf("unmarshal response: %w", err)
-	}
-
-	// 返回sn字段的值
-	return response.Results["sn"], nil
-}
+//// SendWebsocketRequest 发送WebSocket请求并返回sn字段的值
+//func SendWebsocketRequest(serverURL, path string, request Request) (string, error) {
+//	// 构建WebSocket连接URL
+//	u := url.URL{Scheme: "ws", Host: serverURL, Path: path}
+//
+//	// 创建一个Dialer实例,用于建立WebSocket连接
+//	dialer := websocket.Dialer{
+//		ReadBufferSize:  1024,
+//		WriteBufferSize: 1024,
+//		// 可选:设置超时等
+//		HandshakeTimeout: 5 * time.Second,
+//	}
+//
+//	// 建立WebSocket连接
+//	conn, _, err := dialer.Dial(u.String(), nil)
+//	if err != nil {
+//		return "", fmt.Errorf("dial: %w", err)
+//	}
+//	defer conn.Close()
+//
+//	// 将请求JSON编码为字节
+//	requestJSON, err := json.Marshal(request)
+//	if err != nil {
+//		return "", fmt.Errorf("marshal request: %w", err)
+//	}
+//
+//	// 发送WebSocket消息
+//	err = conn.WriteMessage(websocket.TextMessage, requestJSON)
+//	if err != nil {
+//		return "", fmt.Errorf("write: %w", err)
+//	}
+//
+//	// 读取WebSocket响应
+//	_, responseBytes, err := conn.ReadMessage()
+//	if err != nil {
+//		return "", fmt.Errorf("read: %w", err)
+//	}
+//
+//	// 将响应字节解码为JSON
+//	var response Response
+//	err = json.Unmarshal(responseBytes, &response)
+//	if err != nil {
+//		return "", fmt.Errorf("unmarshal response: %w", err)
+//	}
+//
+//	// 返回sn字段的值
+//	return response.Results["sn"], nil
+//}

+ 1 - 0
aarch64/pjibot_guide/common/config/c_local.go

@@ -24,6 +24,7 @@ type localConfig struct {
 	OssBasePrefix        string     `yaml:"oss-base-prefix"`         // 云端配置文件的位置
 	CloudConfigFilename  string     `yaml:"cloud-config-filename"`   // 云端配置文件名称
 	CloudConfigLocalPath string     `yaml:"cloud-config-local-path"` // 将 oss 的配置文件下载到本地的位置
+	LocalWebsocketPort   string     `yaml:"local-websocket-port"`    // websocket端口号
 	RestartCmd           restartCmd `yaml:"restart-cmd"`             // 重启命令
 	EquipmentNo          string     // 当前设备的编号
 	SecretKey            string     // 当前设备的密钥

+ 106 - 0
aarch64/pjibot_guide/common/config/c_websocket.go

@@ -0,0 +1,106 @@
+package config
+
+import (
+	"cicv-data-closedloop/common/config/c_log"
+	"fmt"
+	"github.com/gorilla/websocket"
+	"net/url"
+	"time"
+)
+
+var (
+	WsConn                 *websocket.Conn
+	reconnectionInProgress bool
+)
+
+// Request 结构体定义
+type Request struct {
+	Type      string      `json:"type"`
+	UUID      string      `json:"uuid"`
+	CommandID string      `json:"commandId"`
+	Parameter interface{} `json:"parameter"`
+}
+
+// Response 结构体定义
+type Response struct {
+	CommandID string            `json:"commandId"`
+	ErrorCode string            `json:"errorCode"`
+	Results   map[string]string `json:"results"`
+	Status    string            `json:"status"`
+	Time      int64             `json:"time"`
+	Type      string            `json:"type"`
+	UUID      string            `json:"uuid"`
+}
+
+// 状态消息 结构体定义
+type StatusMessage struct {
+	Type  string      `json:"type"`
+	Topic string      `json:"topic"`
+	Time  int64       `json:"time"`
+	Data  interface{} `json:"data"`
+}
+
+func keepAlive() {
+	ticker := time.NewTicker(30 * time.Second)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			if err := WsConn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
+				c_log.GlobalLogger.Error("保持websocket连接活跃,发送ping - 失败。")
+				c_log.GlobalLogger.Info("重试连接websocket...")
+				InitWebsocketConfig() // 重新连接
+				return
+			}
+		}
+	}
+}
+
+func InitWebsocketConfig() {
+	for {
+		// 防止重复调用
+		if reconnectionInProgress {
+			return
+		}
+		reconnectionInProgress = true
+
+		c_log.GlobalLogger.Info("初始化Websocket连接 - 开始。")
+		serverURL := LocalConfig.Node.Ip + ":" + LocalConfig.LocalWebsocketPort
+		path := "/"
+
+		// 构建WebSocket连接URL
+		u := url.URL{Scheme: "ws", Host: serverURL, Path: path}
+		c_log.GlobalLogger.Info("URL:", u.String())
+
+		// 创建一个Dialer实例,用于建立WebSocket连接
+		dialer := websocket.Dialer{
+			ReadBufferSize:  1024,
+			WriteBufferSize: 1024,
+			// 可选:设置超时等
+			HandshakeTimeout: 5 * time.Second,
+		}
+
+		// 建立WebSocket连接
+		coon, _, err := dialer.Dial(u.String(), nil)
+		if err != nil {
+			fmt.Println("err:", err)
+			c_log.GlobalLogger.Error("初始化Websocket连接 - 失败。")
+			time.Sleep(5 * time.Second)
+			reconnectionInProgress = false
+			c_log.GlobalLogger.Info("重试连接websocket...")
+			continue
+		}
+
+		WsConn = coon
+		c_log.GlobalLogger.Info("初始化Websocket连接 - 成功。")
+
+		// 保持连接活跃
+		go keepAlive()
+
+		// 连接成功,退出循环
+		reconnectionInProgress = false
+		break
+	}
+
+}

+ 20 - 5
aarch64/pjibot_guide/control/main.go

@@ -7,13 +7,18 @@ import (
 	"cicv-data-closedloop/aarch64/pjibot_guide/control/pkg"
 	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
+	"fmt"
 	"net/rpc"
 	"os"
 	"runtime"
 	"time"
 )
 
-var applicationName = "pji-control"
+var (
+	applicationName = "pji-control"
+	localStatus     = "idle"
+	cloudStatus     = "NONE"
+)
 
 func init() {
 	runtime.GOMAXPROCS(1)
@@ -26,13 +31,23 @@ func init() {
 	// 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
 	commonConfig.InitCloudConfig()
 	// 初始化rpc客户端,用于杀死旧的采集程序
+
+	// 初始化websocket配置
+	commonConfig.InitWebsocketConfig()
 }
 
 func main() {
-	go judgeState1()
-	go pkg.JudgeLocal()
-	go judgeState3()
-	select {}
+	//go judgeState1()
+	//go pkg.JudgeLocal()
+	//go judgeState3()
+
+	go pkg.GetLocalStatus(&localStatus)
+
+	for {
+		time.Sleep(5 * time.Second)
+		fmt.Println("localStatus: ", localStatus)
+	}
+	//select {}
 }
 
 func judgeState1() {

+ 42 - 1
aarch64/pjibot_guide/control/pkg/judge_local.go

@@ -1,3 +1,44 @@
 package pkg
 
-func JudgeLocal() {}
+import (
+	"cicv-data-closedloop/aarch64/pjibot_guide/common/config"
+	"encoding/json"
+	"fmt"
+	"log"
+	"time"
+)
+
+type StatusMessage struct {
+	Type  string      `json:"type"`
+	Topic string      `json:"topic"`
+	Time  int64       `json:"time"`
+	Data  interface{} `json:"data"`
+}
+
+func GetLocalStatus(localStatus *string) {
+	for {
+		time.Sleep(2 * time.Second)
+		_, msg, err := config.WsConn.ReadMessage()
+		if err != nil {
+			log.Println("Error in receive:", err)
+			break
+		}
+		log.Printf("Received: %s\n", msg)
+
+		// 将响应字节解码为JSON
+		var statusMessage StatusMessage
+		err = json.Unmarshal(msg, &statusMessage)
+		if err != nil {
+			log.Println("Error in receive:", err)
+			break
+		}
+		fmt.Println("statusMessage:", statusMessage)
+
+		if statusMessage.Type == "push" && statusMessage.Topic == "robotStatus" {
+			data := statusMessage.Data.(map[string]interface{})
+			fmt.Println("statusMessage.Data", data)
+			fmt.Println("statusMessage.Data[\"taskStatus\"]", data["taskStatus"])
+			*localStatus = data["taskStatus"].(string)
+		}
+	}
+}

+ 2 - 0
aarch64/pjibot_guide/引导机器人默认配置文件-local-config.yaml

@@ -11,6 +11,8 @@ oss-base-prefix: pji-double-camera/
 cloud-config-filename: cloud-config.yaml
 # 将oss上的配置文件下载到本地的路径
 cloud-config-local-path: /root/cicv-data-closedloop/config/cloud-config.yaml
+# websocket端口号
+local-websocket-port: 9002
 restart-cmd:
   dir: "/root/cicv-data-closedloop/"
   name: "sh"

+ 23 - 1
tools/pji_api/main/main.go

@@ -29,6 +29,14 @@ type Response struct {
 	UUID      string            `json:"uuid"`
 }
 
+// 状态消息 结构体定义
+type StatusMessage struct {
+	Type  string      `json:"type"`
+	Topic string      `json:"topic"`
+	Time  int64       `json:"time"`
+	Data  interface{} `json:"data"`
+}
+
 // SendWebsocketRequest 发送WebSocket请求并返回sn字段的值
 func SendWebsocketRequest(serverURL, path string, request Request) (string, error) {
 	// 构建WebSocket连接URL
@@ -129,12 +137,26 @@ func SendWebsocketRequest(serverURL, path string, request Request) (string, erro
 
 func receiveHandler(connection *websocket.Conn) {
 	for {
+		time.Sleep(1 * time.Second)
 		_, msg, err := connection.ReadMessage()
 		if err != nil {
 			log.Println("Error in receive:", err)
-			return
+			break
 		}
 		log.Printf("Received: %s\n", msg)
+		// 将响应字节解码为JSON
+		var statusMessage StatusMessage
+		err = json.Unmarshal(msg, &statusMessage)
+		if err != nil {
+			log.Println("Error in receive:", err)
+			break
+		}
+		fmt.Println("statusMessage:", statusMessage)
+		if statusMessage.Type == "push" && statusMessage.Topic == "robotStatus" {
+			data := statusMessage.Data.(map[string]interface{})
+			fmt.Println("statusMessage.Data", data)
+			fmt.Println("statusMessage.Data[\"taskStatus\"]", data["taskStatus"])
+		}
 	}
 }