package config import ( "cicv-data-closedloop/common/config/c_log" "encoding/json" "fmt" "github.com/gorilla/websocket" "net/url" "time" ) var ( WsConn *websocket.Conn reconnectionInProgress bool ) // Request 结构体定义 type Request struct { Type string `json:"type"` UUID string `json:"uuid"` CommandID string `json:"commandId"` Parameter interface{} `json:"parameter"` } // Request1 结构体定义 type Request1 struct { Type string `json:"type"` CommandID string `json:"commandId"` Parameter interface{} `json:"parameter"` } // Response 结构体定义 type Response struct { CommandID string `json:"commandId"` ErrorCode string `json:"errorCode"` Results map[string]string `json:"results"` Status string `json:"status"` Time int64 `json:"time"` Type string `json:"type"` UUID string `json:"uuid"` } // StatusMessage 状态消息 结构体定义 type StatusMessage struct { Type string `json:"type"` Topic string `json:"topic"` Time int64 `json:"time"` Data interface{} `json:"data"` } func keepAlive() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: response, err := sendRequestAndAwaitResponse(WsConn) if err != nil || response == nil { c_log.GlobalLogger.Error("保持websocket连接活跃,此次请求未获取有效数据。", err) continue } var responseMessage Response err = json.Unmarshal(response, &responseMessage) if err != nil { c_log.GlobalLogger.Error("保持websocket连接活跃,解析websocket响应 - 失败。", err) continue } c_log.GlobalLogger.Error("responseMessage", responseMessage) if responseMessage.Status != "OK" { WsConn.Close() c_log.GlobalLogger.Info("重试连接websocket...") ConnectWebsocket() // 重新连接 continue } } } } func sendRequestAndAwaitResponse(ws *websocket.Conn) ([]byte, error) { request := Request1{ Type: "request", CommandID: "heart", Parameter: nil, } requestJSON, err := json.Marshal(request) if err != nil { c_log.GlobalLogger.Error("保持websocket连接活跃,解析requestJSON - 失败。", err) return nil, err } err = ws.WriteMessage(websocket.TextMessage, requestJSON) if err != nil { c_log.GlobalLogger.Error("保持websocket连接活跃,发送心跳请求 - 失败。", err) return nil, err } c_log.GlobalLogger.Info("保持websocket连接活跃,发送心跳请求 - 成功。") // 使用channel等待响应 responseChan := make(chan []byte) go handleMessages(ws, responseChan) select { case response := <-responseChan: c_log.GlobalLogger.Error("保持websocket连接活跃,等待心跳响应 - 成功。") return response, nil case <-time.After(60 * time.Second): // 设置超时时间 c_log.GlobalLogger.Error("保持websocket连接活跃,等待心跳响应 - 超时。") close(responseChan) return nil, fmt.Errorf("保持websocket连接活跃,等待心跳响应 - 超时。") } } func handleMessages(ws *websocket.Conn, responseChan chan<- []byte) { for { time.Sleep(100 * time.Millisecond) _, message, err := ws.ReadMessage() if err != nil { c_log.GlobalLogger.Error("保持websocket连接活跃,读取websocket消息 - 失败 ", err) return } var response Response if err := json.Unmarshal(message, &response); err == nil && response.Type == "response" { responseChan <- message close(responseChan) return } } } func ConnectWebsocket() { for { // 防止重复调用 if reconnectionInProgress { return } reconnectionInProgress = true c_log.GlobalLogger.Info("初始化Websocket连接 - 开始。") serverURL := LocalConfig.Node.Ip + ":" + LocalConfig.LocalWebsocketPort path := "/" // 构建WebSocket连接URL u := url.URL{Scheme: "ws", Host: serverURL, Path: path} c_log.GlobalLogger.Info("URL:", u.String()) // 创建一个Dialer实例,用于建立WebSocket连接 dialer := websocket.Dialer{ ReadBufferSize: 1024, WriteBufferSize: 1024, // 可选:设置超时等 HandshakeTimeout: 5 * time.Minute, } // 建立WebSocket连接 coon, _, err := dialer.Dial(u.String(), nil) if err != nil { fmt.Println("err:", err) c_log.GlobalLogger.Error("初始化Websocket连接 - 失败。") time.Sleep(5 * time.Second) reconnectionInProgress = false c_log.GlobalLogger.Info("重试连接websocket...") continue } WsConn = coon c_log.GlobalLogger.Info("初始化Websocket连接 - 成功。") // 连接成功,退出循环 reconnectionInProgress = false break } } func InitWebsocketConfig() { ConnectWebsocket() // 保持连接活跃 go keepAlive() }