package config import ( "cicv-data-closedloop/common/config/c_log" "encoding/json" "fmt" "github.com/gorilla/websocket" "net/url" "time" ) var ( WsConn *websocket.Conn reconnectionInProgress bool ) // Request 结构体定义 type Request struct { Type string `json:"type"` UUID string `json:"uuid"` CommandID string `json:"commandId"` Parameter interface{} `json:"parameter"` } // Response 结构体定义 type Response struct { CommandID string `json:"commandId"` ErrorCode string `json:"errorCode"` Results map[string]string `json:"results"` Status string `json:"status"` Time int64 `json:"time"` Type string `json:"type"` UUID string `json:"uuid"` } // StatusMessage 状态消息 结构体定义 type StatusMessage struct { Type string `json:"type"` Topic string `json:"topic"` Time int64 `json:"time"` Data interface{} `json:"data"` } func keepAlive() { ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() request := Request{ Type: "request", UUID: "", CommandID: "heart", Parameter: nil, } requestJSON, err := json.Marshal(request) if err != nil { c_log.GlobalLogger.Error("保持websocket连接活跃,解析requestJSON - 失败。", err) } for { select { case <-ticker.C: err := WsConn.WriteMessage(websocket.TextMessage, requestJSON) if err != nil { c_log.GlobalLogger.Error("保持websocket连接活跃,发送心跳请求 - 失败。", err) continue } _, msg, err := WsConn.ReadMessage() if err != nil { c_log.GlobalLogger.Error("保持websocket连接活跃,获取心跳响应 - 失败。", err) continue } c_log.GlobalLogger.Info("保持websocket连接活跃,发送心跳请求 - 成功。") // 将响应字节解码为JSON var responseMessage Response err = json.Unmarshal(msg, &responseMessage) if err != nil { c_log.GlobalLogger.Error("保持websocket连接活跃,解析心跳响应为json - 失败。", err) continue } fmt.Println("websocket responseMessage:", responseMessage) fmt.Println("websocket responseMessage results:", responseMessage.Results) if responseMessage.Status != "" { c_log.GlobalLogger.Info("websocket发送心跳请求解析状态为", responseMessage.Status) } if responseMessage.ErrorCode != "" { c_log.GlobalLogger.Info("websocket发送心跳请求解析故障码为", responseMessage.ErrorCode) } if responseMessage.Status != "OK" { WsConn.Close() c_log.GlobalLogger.Info("重试连接websocket...") ConnectWebsocket() // 重新连接 continue } } } } func ConnectWebsocket() { for { // 防止重复调用 if reconnectionInProgress { return } reconnectionInProgress = true c_log.GlobalLogger.Info("初始化Websocket连接 - 开始。") serverURL := LocalConfig.Node.Ip + ":" + LocalConfig.LocalWebsocketPort path := "/" // 构建WebSocket连接URL u := url.URL{Scheme: "ws", Host: serverURL, Path: path} c_log.GlobalLogger.Info("URL:", u.String()) // 创建一个Dialer实例,用于建立WebSocket连接 dialer := websocket.Dialer{ ReadBufferSize: 1024, WriteBufferSize: 1024, // 可选:设置超时等 HandshakeTimeout: 5 * time.Minute, } // 建立WebSocket连接 coon, _, err := dialer.Dial(u.String(), nil) if err != nil { fmt.Println("err:", err) c_log.GlobalLogger.Error("初始化Websocket连接 - 失败。") time.Sleep(5 * time.Second) reconnectionInProgress = false c_log.GlobalLogger.Info("重试连接websocket...") continue } WsConn = coon c_log.GlobalLogger.Info("初始化Websocket连接 - 成功。") // 连接成功,退出循环 reconnectionInProgress = false break } } func InitWebsocketConfig() { ConnectWebsocket() // 保持连接活跃 go keepAlive() }