Просмотр исходного кода

feat: 配送及巡检机器人修改配置文件及结构体&添加websocket

HeWang 5 месяцев назад
Родитель
Сommit
4be62c3336

+ 39 - 34
aarch64/pjibot_delivery/common/config/c_cloud.go

@@ -52,6 +52,10 @@ type trigger struct {
 	Topics []string `yaml:"topics"`
 }
 
+type CollectNumPlusStruct struct {
+	Url string `yaml:"url"`
+}
+
 type CollectLimitStruct struct {
 	Url   string `yaml:"url"`
 	Flag  int    `yaml:"flag"`
@@ -62,42 +66,43 @@ type CollectLimitStruct struct {
 }
 
 type cloudConfig struct {
-	CollectLimit          CollectLimitStruct `yaml:"collect-limit"`
-	HasOneMsgTopic        bool               `yaml:"has-one-msg-topic"` // 是否存在只发单帧的话题
-	FullCollect           bool               `yaml:"full-collect"`
-	ConfigRefreshInterval int                `yaml:"config-refresh-interval"` // 配置刷新时间间隔
-	BagNumber             int                `yaml:"bag-number"`
-	TimeWindowSendGap     int                `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
-	BagDataDir            string             `yaml:"bag-data-dir"`
-	BagCopyDir            string             `yaml:"bag-copy-dir"`
-	TriggersDir           string             `yaml:"triggers-dir"`
-	RpcPort               string             `yaml:"rpc-port"`
-	Triggers              []trigger          `yaml:"triggers"`
-	Hosts                 []hostStruct       `yaml:"hosts"`
-	Ros                   ros                `yaml:"ros"`
-	Platform              platform           `yaml:"platform"`
-	Disk                  disk               `yaml:"disk"`
-	Monitor               MonitorStruct      `yaml:"monitor"`
+	CollectLimit          CollectLimitStruct   `yaml:"collect-limit"`
+	CollectNumPlus        CollectNumPlusStruct `yaml:"collect-num-plus"`
+	HasOneMsgTopic        bool                 `yaml:"has-one-msg-topic"` // 是否存在只发单帧的话题
+	FullCollect           bool                 `yaml:"full-collect"`
+	ConfigRefreshInterval int                  `yaml:"config-refresh-interval"` // 配置刷新时间间隔
+	BagNumber             int                  `yaml:"bag-number"`
+	TimeWindowSendGap     int                  `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
+	BagDataDir            string               `yaml:"bag-data-dir"`
+	BagCopyDir            string               `yaml:"bag-copy-dir"`
+	TriggersDir           string               `yaml:"triggers-dir"`
+	RpcPort               string               `yaml:"rpc-port"`
+	Triggers              []trigger            `yaml:"triggers"`
+	Hosts                 []hostStruct         `yaml:"hosts"`
+	Ros                   ros                  `yaml:"ros"`
+	Platform              platform             `yaml:"platform"`
+	Disk                  disk                 `yaml:"disk"`
+	Monitor               MonitorStruct        `yaml:"monitor"`
 }
 
-// Request 结构体定义
-type Request struct {
-	Type      string      `json:"type"`
-	UUID      string      `json:"uuid"`
-	CommandID string      `json:"commandId"`
-	Parameter interface{} `json:"parameter"`
-}
-
-// Response 结构体定义
-type Response struct {
-	CommandID string            `json:"commandId"`
-	ErrorCode string            `json:"errorCode"`
-	Results   map[string]string `json:"results"`
-	Status    string            `json:"status"`
-	Time      int64             `json:"time"`
-	Type      string            `json:"type"`
-	UUID      string            `json:"uuid"`
-}
+//// Request 结构体定义
+//type Request struct {
+//	Type      string      `json:"type"`
+//	UUID      string      `json:"uuid"`
+//	CommandID string      `json:"commandId"`
+//	Parameter interface{} `json:"parameter"`
+//}
+//
+//// Response 结构体定义
+//type Response struct {
+//	CommandID string            `json:"commandId"`
+//	ErrorCode string            `json:"errorCode"`
+//	Results   map[string]string `json:"results"`
+//	Status    string            `json:"status"`
+//	Time      int64             `json:"time"`
+//	Type      string            `json:"type"`
+//	UUID      string            `json:"uuid"`
+//}
 
 var (
 	CloudConfig      cloudConfig

+ 1 - 0
aarch64/pjibot_delivery/common/config/c_local.go

@@ -24,6 +24,7 @@ type localConfig struct {
 	OssBasePrefix        string     `yaml:"oss-base-prefix"`         // 云端配置文件的位置
 	CloudConfigFilename  string     `yaml:"cloud-config-filename"`   // 云端配置文件名称
 	CloudConfigLocalPath string     `yaml:"cloud-config-local-path"` // 将 oss 的配置文件下载到本地的位置
+	LocalWebsocketPort   string     `yaml:"local-websocket-port"`    // websocket端口号
 	RestartCmd           restartCmd `yaml:"restart-cmd"`             // 重启命令
 	EquipmentNo          string     // 当前设备的编号
 	SecretKey            string     // 当前设备的密钥

+ 225 - 0
aarch64/pjibot_delivery/common/config/c_websocket.go

@@ -0,0 +1,225 @@
+package config
+
+import (
+	"cicv-data-closedloop/common/config/c_log"
+	"encoding/json"
+	"fmt"
+	"github.com/gorilla/websocket"
+	"net/url"
+	"time"
+)
+
+var (
+	WsConn                 *websocket.Conn
+	reconnectionInProgress bool
+)
+
+// Request 结构体定义
+type Request struct {
+	Type      string      `json:"type"`
+	UUID      string      `json:"uuid"`
+	CommandID string      `json:"commandId"`
+	Parameter interface{} `json:"parameter"`
+}
+
+// Request1 结构体定义
+type Request1 struct {
+	Type      string      `json:"type"`
+	CommandID string      `json:"commandId"`
+	Parameter interface{} `json:"parameter"`
+}
+
+// Response 结构体定义
+type Response struct {
+	CommandID string            `json:"commandId"`
+	ErrorCode string            `json:"errorCode"`
+	Results   map[string]string `json:"results"`
+	Status    string            `json:"status"`
+	Time      int64             `json:"time"`
+	Type      string            `json:"type"`
+	UUID      string            `json:"uuid"`
+}
+
+// StatusMessage 状态消息 结构体定义
+type StatusMessage struct {
+	Type  string      `json:"type"`
+	Topic string      `json:"topic"`
+	Time  int64       `json:"time"`
+	Data  interface{} `json:"data"`
+}
+
+func keepAlive() {
+	ticker := time.NewTicker(30 * time.Second)
+	defer ticker.Stop()
+
+	request := Request1{
+		Type:      "request",
+		CommandID: "heart",
+		Parameter: nil,
+	}
+
+	requestJSON, err := json.Marshal(request)
+	if err != nil {
+		//c_log.GlobalLogger.Error("保持websocket连接活跃,解析requestJSON - 失败。", err)
+	}
+
+	for {
+		select {
+		case <-ticker.C:
+			err := WsConn.WriteMessage(websocket.TextMessage, requestJSON)
+			if err != nil {
+				//c_log.GlobalLogger.Error("保持websocket连接活跃,发送心跳请求 - 失败。", err)
+				WsConn.Close()
+				//c_log.GlobalLogger.Info("重试连接websocket...")
+				ConnectWebsocket() // 重新连接
+				continue
+			}
+			//c_log.GlobalLogger.Info("保持websocket连接活跃,发送心跳请求 - 成功。")
+		}
+	}
+}
+
+func SendWebsocketHeartbeat(conn *websocket.Conn, maxRetries int) (bool, error) {
+
+	request := Request1{
+		Type:      "request",
+		CommandID: "heart",
+		Parameter: nil,
+	}
+
+	// 将请求JSON编码为字节
+	requestJSON, err := json.Marshal(request)
+	if err != nil {
+		return false, fmt.Errorf("marshal request: %w", err)
+	}
+
+	// 发送WebSocket消息
+	err = conn.WriteMessage(websocket.TextMessage, requestJSON)
+	if err != nil {
+		return false, fmt.Errorf("write: %w", err)
+	}
+
+	count := 0
+	for {
+		if count > maxRetries {
+			return false, fmt.Errorf("保持websocket连接活跃,读取websocket消息超过最大重试次数。")
+		}
+		time.Sleep(1 * time.Second)
+		_, message, err := conn.ReadMessage()
+		if err != nil {
+			c_log.GlobalLogger.Error("保持websocket连接活跃,读取websocket消息 - 失败 ", err, " 继续读取消息。")
+			return false, err
+			//continue
+		}
+
+		var response Response
+		err = json.Unmarshal(message, &response)
+		c_log.GlobalLogger.Info("response ", response)
+		if err == nil && response.Type == "response" {
+			c_log.GlobalLogger.Info("response1 ", response)
+			return true, err
+		}
+		count++
+	}
+}
+
+func sendRequestAndAwaitResponse(ws *websocket.Conn) ([]byte, error) {
+	request := Request1{
+		Type:      "request",
+		CommandID: "heart",
+		Parameter: nil,
+	}
+
+	requestJSON, err := json.Marshal(request)
+	if err != nil {
+		c_log.GlobalLogger.Error("保持websocket连接活跃,解析requestJSON - 失败。", err)
+		return nil, err
+	}
+
+	err = ws.WriteMessage(websocket.TextMessage, requestJSON)
+	if err != nil {
+		c_log.GlobalLogger.Error("保持websocket连接活跃,发送心跳请求 - 失败。", err)
+		return nil, err
+	}
+	c_log.GlobalLogger.Info("保持websocket连接活跃,发送心跳请求 - 成功。")
+
+	// 使用channel等待响应
+	responseChan := make(chan []byte)
+	go handleMessages(ws, responseChan)
+
+	select {
+	case response := <-responseChan:
+		c_log.GlobalLogger.Error("保持websocket连接活跃,等待心跳响应 - 成功。")
+		return response, nil
+	case <-time.After(60 * time.Second): // 设置超时时间
+		c_log.GlobalLogger.Error("保持websocket连接活跃,等待心跳响应 - 超时。")
+		close(responseChan)
+		return nil, fmt.Errorf("保持websocket连接活跃,等待心跳响应 - 超时。")
+	}
+}
+
+func handleMessages(ws *websocket.Conn, responseChan chan<- []byte) {
+	for {
+		time.Sleep(100 * time.Millisecond)
+		_, message, err := ws.ReadMessage()
+		if err != nil {
+			c_log.GlobalLogger.Error("保持websocket连接活跃,读取websocket消息 - 失败 ", err)
+			return
+		}
+
+		var response Response
+		if err := json.Unmarshal(message, &response); err == nil && response.Type == "response" {
+			responseChan <- message
+			close(responseChan)
+			return
+		}
+	}
+}
+
+func ConnectWebsocket() {
+	for {
+		// 防止重复调用
+		if reconnectionInProgress {
+			return
+		}
+		reconnectionInProgress = true
+		//c_log.GlobalLogger.Info("初始化Websocket连接 - 开始。")
+		serverURL := LocalConfig.Node.Ip + ":" + LocalConfig.LocalWebsocketPort
+		path := "/"
+
+		// 构建WebSocket连接URL
+		u := url.URL{Scheme: "ws", Host: serverURL, Path: path}
+		//c_log.GlobalLogger.Info("URL:", u.String())
+
+		// 创建一个Dialer实例,用于建立WebSocket连接
+		dialer := websocket.Dialer{
+			ReadBufferSize:  1024,
+			WriteBufferSize: 1024,
+			// 可选:设置超时等
+			HandshakeTimeout: 5 * time.Minute,
+		}
+
+		// 建立WebSocket连接
+		coon, _, err := dialer.Dial(u.String(), nil)
+		if err != nil {
+			fmt.Println("err:", err)
+			//c_log.GlobalLogger.Error("初始化Websocket连接 - 失败。")
+			time.Sleep(5 * time.Second)
+			reconnectionInProgress = false
+			//c_log.GlobalLogger.Info("重试连接websocket...")
+			continue
+		}
+
+		WsConn = coon
+		//c_log.GlobalLogger.Info("初始化Websocket连接 - 成功。")
+		// 连接成功,退出循环
+		reconnectionInProgress = false
+		break
+	}
+}
+
+func InitWebsocketConfig() {
+	ConnectWebsocket()
+	// 保持连接活跃
+	go keepAlive()
+}

+ 2 - 0
aarch64/pjibot_delivery/control/main.go

@@ -24,6 +24,8 @@ func init() {
 	commonConfig.InitOssConfig()
 	// 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
 	commonConfig.InitCloudConfig()
+	// 初始化websocket配置
+	commonConfig.InitWebsocketConfig()
 }
 
 func main() {

+ 5 - 0
aarch64/pjibot_delivery/配送机器人默认配置文件-cloud-config.yaml

@@ -1,10 +1,13 @@
 ---
 collect-limit:
   url: http://36.110.106.142:12341/web_server/collect_limit/can_collect
+  flag: 1 # 数采频率限制标志 0 - 关闭数采频率限制  1 - 开启数采频率限制
   day: 3
   week: 7
   month: 30
   year: 365
+collect-num-plus:
+  url: http://36.110.106.142:12341/web_server/collect_limit/plus_collect_num
 monitor:
   url: http://36.110.106.142:12341/web_server/monitor/insert
 platform:
@@ -17,6 +20,8 @@ config-refresh-interval: 60
 disk:
   name: /dev/nvme0n1p1 # 磁盘名称
   used: 108000000000 # 磁盘占用阈值,单位bytes
+  path:
+    - /home/pji/cicv-data-closedloop
 has-one-msg: false
 bag-data-dir: /home/pji/cicv-data-closedloop/data/
 bag-copy-dir: /home/pji/cicv-data-closedloop/copy/

+ 2 - 0
aarch64/pjibot_delivery/配送机器人默认配置文件-local-config.yaml

@@ -11,6 +11,8 @@ oss-base-prefix: pjibot-delivery/
 cloud-config-filename: cloud-config.yaml
 # 将oss上的配置文件下载到本地的路径
 cloud-config-local-path: /home/pji/cicv-data-closedloop/config/cloud-config.yaml
+# websocket端口号
+local-websocket-port: 9002
 restart-cmd:
   dir: "/home/pji/cicv-data-closedloop/"
   name: "sh"

+ 25 - 21
aarch64/pjibot_guide/common/config/c_cloud.go

@@ -58,6 +58,10 @@ type CollectLimitStruct struct {
 	Year  int    `yaml:"year"`
 }
 
+type CollectNumPlusStruct struct {
+	Url string `yaml:"url"`
+}
+
 type DataDirStruct struct {
 	Src    string   `yaml:"src"`
 	SrcSub []string `yaml:"src-sub"`
@@ -65,27 +69,27 @@ type DataDirStruct struct {
 }
 
 type cloudConfig struct {
-	CollectLimit          CollectLimitStruct `yaml:"collect-limit"`
-	CollectNumPlus        CollectLimitStruct `yaml:"collect-num-plus"`
-	DataDir               DataDirStruct      `yaml:"data-dir"`
-	MapBufFiles           []string           `yaml:"map-buf-files"`
-	FullCollect           bool               `yaml:"full-collect"`
-	ConfigRefreshInterval int                `yaml:"config-refresh-interval"` // 配置刷新时间间隔
-	BagNumber             int                `yaml:"bag-number"`
-	TimeWindowSendGap     int                `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
-	MapBagPath            string             `yaml:"map-bag-path"`
-	TfstaticBagPath       string             `yaml:"tfstatic-bag-path"`
-	CostmapBagPath        string             `yaml:"costmap-bag-path"`
-	BagDataDir            string             `yaml:"bag-data-dir"`
-	BagCopyDir            string             `yaml:"bag-copy-dir"`
-	TriggersDir           string             `yaml:"triggers-dir"`
-	RpcPort               string             `yaml:"rpc-port"`
-	Triggers              []TriggerStruct    `yaml:"triggers"`
-	Hosts                 []HostStruct       `yaml:"hosts"`
-	Ros                   RosStruct          `yaml:"ros"`
-	Platform              PlatformStruct     `yaml:"platform"`
-	Disk                  DiskStruct         `yaml:"disk"`
-	Monitor               MonitorStruct      `yaml:"monitor"`
+	CollectLimit          CollectLimitStruct   `yaml:"collect-limit"`
+	CollectNumPlus        CollectNumPlusStruct `yaml:"collect-num-plus"`
+	DataDir               DataDirStruct        `yaml:"data-dir"`
+	MapBufFiles           []string             `yaml:"map-buf-files"`
+	FullCollect           bool                 `yaml:"full-collect"`
+	ConfigRefreshInterval int                  `yaml:"config-refresh-interval"` // 配置刷新时间间隔
+	BagNumber             int                  `yaml:"bag-number"`
+	TimeWindowSendGap     int                  `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
+	MapBagPath            string               `yaml:"map-bag-path"`
+	TfstaticBagPath       string               `yaml:"tfstatic-bag-path"`
+	CostmapBagPath        string               `yaml:"costmap-bag-path"`
+	BagDataDir            string               `yaml:"bag-data-dir"`
+	BagCopyDir            string               `yaml:"bag-copy-dir"`
+	TriggersDir           string               `yaml:"triggers-dir"`
+	RpcPort               string               `yaml:"rpc-port"`
+	Triggers              []TriggerStruct      `yaml:"triggers"`
+	Hosts                 []HostStruct         `yaml:"hosts"`
+	Ros                   RosStruct            `yaml:"ros"`
+	Platform              PlatformStruct       `yaml:"platform"`
+	Disk                  DiskStruct           `yaml:"disk"`
+	Monitor               MonitorStruct        `yaml:"monitor"`
 }
 
 //// Request 结构体定义

+ 39 - 34
aarch64/pjibot_patrol/common/config/c_cloud.go

@@ -52,6 +52,10 @@ type trigger struct {
 	Topics []string `yaml:"topics"`
 }
 
+type CollectNumPlusStruct struct {
+	Url string `yaml:"url"`
+}
+
 type CollectLimitStruct struct {
 	Url   string `yaml:"url"`
 	Flag  int    `yaml:"flag"`
@@ -62,42 +66,43 @@ type CollectLimitStruct struct {
 }
 
 type cloudConfig struct {
-	CollectLimit          CollectLimitStruct `yaml:"collect-limit"`
-	HasOneMsgTopic        bool               `yaml:"has-one-msg-topic"` // 是否存在只发单帧的话题
-	FullCollect           bool               `yaml:"full-collect"`
-	ConfigRefreshInterval int                `yaml:"config-refresh-interval"` // 配置刷新时间间隔
-	BagNumber             int                `yaml:"bag-number"`
-	TimeWindowSendGap     int                `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
-	BagDataDir            string             `yaml:"bag-data-dir"`
-	BagCopyDir            string             `yaml:"bag-copy-dir"`
-	TriggersDir           string             `yaml:"triggers-dir"`
-	RpcPort               string             `yaml:"rpc-port"`
-	Triggers              []trigger          `yaml:"triggers"`
-	Hosts                 []hostStruct       `yaml:"hosts"`
-	Ros                   ros                `yaml:"ros"`
-	Platform              platform           `yaml:"platform"`
-	Disk                  disk               `yaml:"disk"`
-	Monitor               MonitorStruct      `yaml:"monitor"`
+	CollectLimit          CollectLimitStruct   `yaml:"collect-limit"`
+	CollectNumPlus        CollectNumPlusStruct `yaml:"collect-num-plus"`
+	HasOneMsgTopic        bool                 `yaml:"has-one-msg-topic"` // 是否存在只发单帧的话题
+	FullCollect           bool                 `yaml:"full-collect"`
+	ConfigRefreshInterval int                  `yaml:"config-refresh-interval"` // 配置刷新时间间隔
+	BagNumber             int                  `yaml:"bag-number"`
+	TimeWindowSendGap     int                  `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
+	BagDataDir            string               `yaml:"bag-data-dir"`
+	BagCopyDir            string               `yaml:"bag-copy-dir"`
+	TriggersDir           string               `yaml:"triggers-dir"`
+	RpcPort               string               `yaml:"rpc-port"`
+	Triggers              []trigger            `yaml:"triggers"`
+	Hosts                 []hostStruct         `yaml:"hosts"`
+	Ros                   ros                  `yaml:"ros"`
+	Platform              platform             `yaml:"platform"`
+	Disk                  disk                 `yaml:"disk"`
+	Monitor               MonitorStruct        `yaml:"monitor"`
 }
 
-// Request 结构体定义
-type Request struct {
-	Type      string      `json:"type"`
-	UUID      string      `json:"uuid"`
-	CommandID string      `json:"commandId"`
-	Parameter interface{} `json:"parameter"`
-}
-
-// Response 结构体定义
-type Response struct {
-	CommandID string            `json:"commandId"`
-	ErrorCode string            `json:"errorCode"`
-	Results   map[string]string `json:"results"`
-	Status    string            `json:"status"`
-	Time      int64             `json:"time"`
-	Type      string            `json:"type"`
-	UUID      string            `json:"uuid"`
-}
+//// Request 结构体定义
+//type Request struct {
+//	Type      string      `json:"type"`
+//	UUID      string      `json:"uuid"`
+//	CommandID string      `json:"commandId"`
+//	Parameter interface{} `json:"parameter"`
+//}
+//
+//// Response 结构体定义
+//type Response struct {
+//	CommandID string            `json:"commandId"`
+//	ErrorCode string            `json:"errorCode"`
+//	Results   map[string]string `json:"results"`
+//	Status    string            `json:"status"`
+//	Time      int64             `json:"time"`
+//	Type      string            `json:"type"`
+//	UUID      string            `json:"uuid"`
+//}
 
 var (
 	CloudConfig      cloudConfig

+ 1 - 0
aarch64/pjibot_patrol/common/config/c_local.go

@@ -24,6 +24,7 @@ type localConfig struct {
 	OssBasePrefix        string     `yaml:"oss-base-prefix"`         // 云端配置文件的位置
 	CloudConfigFilename  string     `yaml:"cloud-config-filename"`   // 云端配置文件名称
 	CloudConfigLocalPath string     `yaml:"cloud-config-local-path"` // 将 oss 的配置文件下载到本地的位置
+	LocalWebsocketPort   string     `yaml:"local-websocket-port"`    // websocket端口号
 	RestartCmd           restartCmd `yaml:"restart-cmd"`             // 重启命令
 	EquipmentNo          string     // 当前设备的编号
 	SecretKey            string     // 当前设备的密钥

+ 225 - 0
aarch64/pjibot_patrol/common/config/c_websocket.go

@@ -0,0 +1,225 @@
+package config
+
+import (
+	"cicv-data-closedloop/common/config/c_log"
+	"encoding/json"
+	"fmt"
+	"github.com/gorilla/websocket"
+	"net/url"
+	"time"
+)
+
+var (
+	WsConn                 *websocket.Conn
+	reconnectionInProgress bool
+)
+
+// Request 结构体定义
+type Request struct {
+	Type      string      `json:"type"`
+	UUID      string      `json:"uuid"`
+	CommandID string      `json:"commandId"`
+	Parameter interface{} `json:"parameter"`
+}
+
+// Request1 结构体定义
+type Request1 struct {
+	Type      string      `json:"type"`
+	CommandID string      `json:"commandId"`
+	Parameter interface{} `json:"parameter"`
+}
+
+// Response 结构体定义
+type Response struct {
+	CommandID string            `json:"commandId"`
+	ErrorCode string            `json:"errorCode"`
+	Results   map[string]string `json:"results"`
+	Status    string            `json:"status"`
+	Time      int64             `json:"time"`
+	Type      string            `json:"type"`
+	UUID      string            `json:"uuid"`
+}
+
+// StatusMessage 状态消息 结构体定义
+type StatusMessage struct {
+	Type  string      `json:"type"`
+	Topic string      `json:"topic"`
+	Time  int64       `json:"time"`
+	Data  interface{} `json:"data"`
+}
+
+func keepAlive() {
+	ticker := time.NewTicker(30 * time.Second)
+	defer ticker.Stop()
+
+	request := Request1{
+		Type:      "request",
+		CommandID: "heart",
+		Parameter: nil,
+	}
+
+	requestJSON, err := json.Marshal(request)
+	if err != nil {
+		//c_log.GlobalLogger.Error("保持websocket连接活跃,解析requestJSON - 失败。", err)
+	}
+
+	for {
+		select {
+		case <-ticker.C:
+			err := WsConn.WriteMessage(websocket.TextMessage, requestJSON)
+			if err != nil {
+				//c_log.GlobalLogger.Error("保持websocket连接活跃,发送心跳请求 - 失败。", err)
+				WsConn.Close()
+				//c_log.GlobalLogger.Info("重试连接websocket...")
+				ConnectWebsocket() // 重新连接
+				continue
+			}
+			//c_log.GlobalLogger.Info("保持websocket连接活跃,发送心跳请求 - 成功。")
+		}
+	}
+}
+
+func SendWebsocketHeartbeat(conn *websocket.Conn, maxRetries int) (bool, error) {
+
+	request := Request1{
+		Type:      "request",
+		CommandID: "heart",
+		Parameter: nil,
+	}
+
+	// 将请求JSON编码为字节
+	requestJSON, err := json.Marshal(request)
+	if err != nil {
+		return false, fmt.Errorf("marshal request: %w", err)
+	}
+
+	// 发送WebSocket消息
+	err = conn.WriteMessage(websocket.TextMessage, requestJSON)
+	if err != nil {
+		return false, fmt.Errorf("write: %w", err)
+	}
+
+	count := 0
+	for {
+		if count > maxRetries {
+			return false, fmt.Errorf("保持websocket连接活跃,读取websocket消息超过最大重试次数。")
+		}
+		time.Sleep(1 * time.Second)
+		_, message, err := conn.ReadMessage()
+		if err != nil {
+			c_log.GlobalLogger.Error("保持websocket连接活跃,读取websocket消息 - 失败 ", err, " 继续读取消息。")
+			return false, err
+			//continue
+		}
+
+		var response Response
+		err = json.Unmarshal(message, &response)
+		c_log.GlobalLogger.Info("response ", response)
+		if err == nil && response.Type == "response" {
+			c_log.GlobalLogger.Info("response1 ", response)
+			return true, err
+		}
+		count++
+	}
+}
+
+func sendRequestAndAwaitResponse(ws *websocket.Conn) ([]byte, error) {
+	request := Request1{
+		Type:      "request",
+		CommandID: "heart",
+		Parameter: nil,
+	}
+
+	requestJSON, err := json.Marshal(request)
+	if err != nil {
+		c_log.GlobalLogger.Error("保持websocket连接活跃,解析requestJSON - 失败。", err)
+		return nil, err
+	}
+
+	err = ws.WriteMessage(websocket.TextMessage, requestJSON)
+	if err != nil {
+		c_log.GlobalLogger.Error("保持websocket连接活跃,发送心跳请求 - 失败。", err)
+		return nil, err
+	}
+	c_log.GlobalLogger.Info("保持websocket连接活跃,发送心跳请求 - 成功。")
+
+	// 使用channel等待响应
+	responseChan := make(chan []byte)
+	go handleMessages(ws, responseChan)
+
+	select {
+	case response := <-responseChan:
+		c_log.GlobalLogger.Error("保持websocket连接活跃,等待心跳响应 - 成功。")
+		return response, nil
+	case <-time.After(60 * time.Second): // 设置超时时间
+		c_log.GlobalLogger.Error("保持websocket连接活跃,等待心跳响应 - 超时。")
+		close(responseChan)
+		return nil, fmt.Errorf("保持websocket连接活跃,等待心跳响应 - 超时。")
+	}
+}
+
+func handleMessages(ws *websocket.Conn, responseChan chan<- []byte) {
+	for {
+		time.Sleep(100 * time.Millisecond)
+		_, message, err := ws.ReadMessage()
+		if err != nil {
+			c_log.GlobalLogger.Error("保持websocket连接活跃,读取websocket消息 - 失败 ", err)
+			return
+		}
+
+		var response Response
+		if err := json.Unmarshal(message, &response); err == nil && response.Type == "response" {
+			responseChan <- message
+			close(responseChan)
+			return
+		}
+	}
+}
+
+func ConnectWebsocket() {
+	for {
+		// 防止重复调用
+		if reconnectionInProgress {
+			return
+		}
+		reconnectionInProgress = true
+		//c_log.GlobalLogger.Info("初始化Websocket连接 - 开始。")
+		serverURL := LocalConfig.Node.Ip + ":" + LocalConfig.LocalWebsocketPort
+		path := "/"
+
+		// 构建WebSocket连接URL
+		u := url.URL{Scheme: "ws", Host: serverURL, Path: path}
+		//c_log.GlobalLogger.Info("URL:", u.String())
+
+		// 创建一个Dialer实例,用于建立WebSocket连接
+		dialer := websocket.Dialer{
+			ReadBufferSize:  1024,
+			WriteBufferSize: 1024,
+			// 可选:设置超时等
+			HandshakeTimeout: 5 * time.Minute,
+		}
+
+		// 建立WebSocket连接
+		coon, _, err := dialer.Dial(u.String(), nil)
+		if err != nil {
+			fmt.Println("err:", err)
+			//c_log.GlobalLogger.Error("初始化Websocket连接 - 失败。")
+			time.Sleep(5 * time.Second)
+			reconnectionInProgress = false
+			//c_log.GlobalLogger.Info("重试连接websocket...")
+			continue
+		}
+
+		WsConn = coon
+		//c_log.GlobalLogger.Info("初始化Websocket连接 - 成功。")
+		// 连接成功,退出循环
+		reconnectionInProgress = false
+		break
+	}
+}
+
+func InitWebsocketConfig() {
+	ConnectWebsocket()
+	// 保持连接活跃
+	go keepAlive()
+}

+ 3 - 0
aarch64/pjibot_patrol/control/main.go

@@ -25,6 +25,9 @@ func init() {
 	// 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
 	commonConfig.InitCloudConfig()
 	// 初始化rpc客户端,用于杀死旧的采集程序
+
+	// 初始化websocket配置
+	commonConfig.InitWebsocketConfig()
 }
 
 func main() {

+ 5 - 0
aarch64/pjibot_patrol/巡检机器人默认配置文件-cloud-config.yaml

@@ -1,10 +1,13 @@
 ---
 collect-limit:
   url: http://36.110.106.142:12341/web_server/collect_limit/can_collect
+  flag: 1 # 数采频率限制标志 0 - 关闭数采频率限制  1 - 开启数采频率限制
   day: 3
   week: 7
   month: 30
   year: 365
+collect-num-plus:
+  url: http://36.110.106.142:12341/web_server/collect_limit/plus_collect_num
 monitor:
   url: http://36.110.106.142:12341/web_server/monitor/insert
 platform:
@@ -17,6 +20,8 @@ config-refresh-interval: 60
 disk:
   name: /dev/nvme0n1p1 # 磁盘名称
   used: 108000000000 # 磁盘占用阈值,单位bytes
+  path:
+    - /home/pji/cicv-data-closedloop
 has-one-msg: false
 bag-data-dir: /home/pji/cicv-data-closedloop/data/
 bag-copy-dir: /home/pji/cicv-data-closedloop/copy/

+ 2 - 0
aarch64/pjibot_patrol/巡检机器人默认配置文件-local-config.yaml

@@ -12,6 +12,8 @@ oss-base-prefix: pjibot-patrol/
 cloud-config-filename: cloud-config.yaml
 # 将oss上的配置文件下载到本地的路径
 cloud-config-local-path: /home/pji/cicv-data-closedloop/config/cloud-config.yaml
+# websocket端口号
+local-websocket-port: 9002
 restart-cmd:
   dir: "/home/pji/cicv-data-closedloop/"
   name: "sh"