package config import ( "cicv-data-closedloop/common/config/c_log" "cicv-data-closedloop/common/util" "crypto/md5" "encoding/hex" "encoding/json" "fmt" "sort" "strings" "time" ) type taskTrigger struct { TriggerId int `json:"triggerId"` // 触发器id TriggerName string `json:"triggerName"` // 触发器名称 TriggerScriptPath string `json:"triggerScriptPath"` // 脚本在oss上的path ListenTopic string `json:"listenTopic"` // 触发器监听的topic CollectionTopic string `json:"collectionTopic"` // 触发器采集的topic CollectionPreSeconds string `json:"collectionPreSeconds"` // 采集前几秒的数据 CollectionAfterSeconds string `json:"collectionAfterSeconds"` // 采集后几秒的数据 Priority string `json:"priority"` // 优先级, 数值越小越优先 } type PlatformConfigStruct struct { TaskConfigId string `json:"taskConfigId"` // 任务ID TaskConfigName string `json:"taskConfigName"` // 任务名称 TaskType string `json:"taskType"` // 任务类型 StartTime string `json:"startTime"` // 开始时间(taskType为“single”有效) EndTime string `json:"endTime"` // 结束时间(taskType为“single”有效) DailyStartTime string `json:"dailyStartTime"` // 开始时间(taskType为“daily/weekly/monthly”有效) DailyEndTime string `json:"dailyEndTime"` // 结束时间(taskType为“daily/weekly/monthly”有效) CollectionStrategy int `json:"collectionStrategy"` // 采集策略 UploadStrategy int `json:"uploadStrategy"` // 上传策略 CompressionType int `json:"compressionType"` // 压缩类型 TaskTriggers []taskTrigger `json:"taskTriggers"` // 触发器信息 } type response struct { Data []PlatformConfigStruct `json:"data"` Success bool `json:"success"` Message string `json:"message"` Code int `json:"code"` NowTime string `json:"nowTime"` } var ( PlatformConfig PlatformConfigStruct PlatformConfig1 []PlatformConfigStruct RecordTopics []string ) // InitPlatformConfig 初始化数据闭环平台的配置 func InitPlatformConfig() { if !LocalConfig.Internet { return } var err error c_log.GlobalLogger.Info("获取数据闭环平台配置 - 开始") // 1 如果车辆没有配置任务,则阻塞在这里,不启动任务 for { time.Sleep(time.Duration(2) * time.Second) PlatformConfig1, err = GetConfig() if err != nil { c_log.GlobalLogger.Error("获取配置status失败:", err) continue } //if checkPlatformConfig() { // RecordTopics = strings.Split(PlatformConfig.EquipmentTopic, ",") // // 去掉首尾空格 // for i, topic := range RecordTopics { // RecordTopics[i] = strings.TrimSpace(topic) // } // break //} } //c_log.GlobalLogger.Infof("获取数据闭环平台配置 - 成功。【触发前】=【%v 秒】,触发后=【%v 秒】,【最大窗口】=【%v 秒】。", PlatformConfig.TaskBeforeTime, PlatformConfig.TaskAfterTime, PlatformConfig.TaskMaxTime) } /* { "data": { "accessToken": "YWRmYWRzZmFzZGZhZHNmYWRmYWRm=", "expireTime": "28800", "equipmentNo": "robot-001" }, "success": true, "message": "ok", "code": 1, "nowTime": "2023-12-09 22:41:00" } */ // 认证接口,获取access_token func GetAccessToken() (string, error) { url := &CloudConfig.Platform.UrlDeviceAuth param := &map[string]string{ "equipmentNo": LocalConfig.EquipmentNo, "secretKey": LocalConfig.SecretKey, } respJson, err := util.HttpPostJsonResponseString( *url, *param, ) if err != nil { return "", nil } respMap, err := util.JsonStringToMap(respJson) if err != nil { c_log.GlobalLogger.Errorf("解析返回结果【%v】失败,请求地址为【%v】,请求参数为【%v】:%v", respJson, *url, *param, err) return "", nil } dataMap, ok := respMap["data"].(map[string]interface{}) if !ok { c_log.GlobalLogger.Error("解析返回结果.data", dataMap, "失败:", err) return "", nil } return dataMap["accessToken"].(string), nil } /* { "data": { "status": "UNCHANGE" "taskConfigld": "xxx" }, "success": true, "message": "ok", "code": 1, "nowTime": "2023-12-09 21:08:49" } */ //GetStatus 根据taskConfigId获取任务status,如果传入空代表车端没有配置,直接获取新的配置 func GetStatus(taskConfigId string) (string, error) { token, err := GetAccessToken() if err != nil { return "", err } resp, err := util.HttpGetStringAddHeadersResponseString( CloudConfig.Platform.UrlTaskPoll, map[string]string{ "authorization": token, }, map[string]string{ "equipmentNo": LocalConfig.EquipmentNo, "taskConfigId": taskConfigId, }, ) if err != nil { c_log.GlobalLogger.Error("访问接口", CloudConfig.Platform.UrlTask, "失败:", err) return "", err } respMap, err := util.JsonStringToMap(resp) if err != nil { c_log.GlobalLogger.Error("解析【返回结果1】", resp, "失败:", err) return "", err } dataMap, ok := respMap["data"].(map[string]interface{}) if !ok { c_log.GlobalLogger.Errorf("解析【返回结果.data】的类型不是(map[string]interface{}),【dataMap】=%v", dataMap) return "", err } return dataMap["status"].(string), nil } func GetConfig() ([]PlatformConfigStruct, error) { queryParams := map[string]string{ "equipmentNo": LocalConfig.SecretKey, } // 1. 生成签名 sign := GenerateSign(CloudConfig.Platform.Ak, CloudConfig.Platform.Sk, queryParams) fmt.Println("sign:", sign) if sign == "" { return []PlatformConfigStruct{}, fmt.Errorf("生成签名失败。") } headers := map[string]string{ "ak": CloudConfig.Platform.Ak, "timestamp": fmt.Sprintf("%d", time.Now().Unix()), "sign": sign, } // 2 访问任务配置获取接口 resp, err := util.HttpGetStringAddHeadersResponseString( CloudConfig.Platform.UrlTask, headers, queryParams, ) if err != nil { c_log.GlobalLogger.Error("访问接口", CloudConfig.Platform.UrlTask, "失败:", err) return []PlatformConfigStruct{}, err } var result response err = json.Unmarshal([]byte(resp), &result) fmt.Println("result", result) if err != nil { c_log.GlobalLogger.Error("解析【返回结果】", resp, "失败:", err) return []PlatformConfigStruct{}, err } return result.Data, nil } func checkPlatformConfig() bool { if PlatformConfig.TaskConfigId == "" { c_log.GlobalLogger.Error("数据闭环平台没有配置任务。") return false } //if PlatformConfig.EquipmentTopic == "" { // c_log.GlobalLogger.Error("数据闭环平台没有配置topic序列。") // return false //} return true } // GenerateSign 生成签名 func GenerateSign(ak, sk string, params map[string]string) string { // 1. 添加时间戳 params["timestamp"] = fmt.Sprintf("%d", time.Now().Unix()) // 2. 参数排序 keys := make([]string, 0, len(params)) for k := range params { keys = append(keys, k) } sort.Strings(keys) // 3. 拼接参数 var buffer strings.Builder for _, k := range keys { buffer.WriteString(k) buffer.WriteByte('=') buffer.WriteString(params[k]) buffer.WriteByte('&') } paramStr := buffer.String()[:len(buffer.String())-1] // 4. 计算MD5 signStr := paramStr + sk hash := md5.Sum([]byte(signStr)) return hex.EncodeToString(hash[:]) }