package config import ( "cicv-data-closedloop/common/config/c_log" "cicv-data-closedloop/common/util" "crypto/md5" "encoding/hex" "encoding/json" "fmt" "sort" "strings" "time" ) type TaskTrigger struct { TriggerId string `json:"triggerId"` // 触发器id TriggerName string `json:"triggerName"` // 触发器名称 TriggerScriptPath string `json:"triggerScriptPath"` // 脚本在oss上的path ListenTopic string `json:"listenTopic"` // 触发器监听的topic CollectionTopic string `json:"collectionTopic"` // 触发器采集的topic CollectionPreSeconds int `json:"collectionPreSeconds"` // 采集前几秒的数据 CollectionAfterSeconds int `json:"collectionAfterSeconds"` // 采集后几秒的数据 Priority int `json:"priority"` // 优先级, 数值越小越优先 } type PlatformConfigStruct struct { TaskConfigId string `json:"taskConfigId"` // 任务ID TaskConfigName string `json:"taskConfigName"` // 任务名称 TaskType string `json:"taskType"` // 任务类型 StartTime string `json:"startTime"` // 开始时间(taskType为“single”有效) EndTime string `json:"endTime"` // 结束时间(taskType为“single”有效) DailyStartTime string `json:"dailyStartTime"` // 开始时间(taskType为“daily/weekly/monthly”有效) DailyEndTime string `json:"dailyEndTime"` // 结束时间(taskType为“daily/weekly/monthly”有效) CollectionStrategy int `json:"collectionStrategy"` // 采集策略 UploadStrategy int `json:"uploadStrategy"` // 上传策略 CompressionType int `json:"compressionType"` // 压缩类型 TaskTriggers []TaskTrigger `json:"taskTriggers"` // 触发器信息 } type response struct { Data []PlatformConfigStruct `json:"data"` Success bool `json:"success"` Message string `json:"message"` Code int `json:"code"` NowTime string `json:"nowTime"` } var ( PlatformConfig PlatformConfigStruct PlatformTasks []PlatformConfigStruct RecordTopics []string CurrTask PlatformConfigStruct // 正在运行中的任务 ) // InitPlatformConfig 初始化数据闭环平台的配置 func InitPlatformConfig() { if !LocalConfig.Internet { return } var err error c_log.GlobalLogger.Info("获取数据闭环平台配置 - 开始") // 1 如果车辆没有配置任务,则阻塞在这里,不启动任务 for { time.Sleep(time.Duration(2) * time.Second) PlatformTasks, err = GetConfig() if err != nil { c_log.GlobalLogger.Error("获取配置status失败:", err) continue } if checkPlatformTasks() { break } } c_log.GlobalLogger.Infof("获取数据闭环平台配置 - 成功。") //c_log.GlobalLogger.Infof("获取数据闭环平台配置 - 成功。【触发前】=【%v 秒】,触发后=【%v 秒】,【最大窗口】=【%v 秒】。", PlatformConfig.TaskBeforeTime, PlatformConfig.TaskAfterTime, PlatformConfig.TaskMaxTime) } func GetConfig() ([]PlatformConfigStruct, error) { queryParams := map[string]string{ "equipmentNo": LocalConfig.SecretKey, } // 1. 生成签名 sign := GenerateSign(CloudConfig.Platform.Ak, CloudConfig.Platform.Sk, queryParams) fmt.Println("sign:", sign) if sign == "" { return []PlatformConfigStruct{}, fmt.Errorf("生成签名失败。") } headers := map[string]string{ "ak": CloudConfig.Platform.Ak, "timestamp": fmt.Sprintf("%d", time.Now().Unix()), "sign": sign, } // 2 访问任务配置获取接口 resp, err := util.HttpGetStringAddHeadersResponseString( CloudConfig.Platform.UrlTask, headers, queryParams, ) if err != nil { c_log.GlobalLogger.Error("访问接口", CloudConfig.Platform.UrlTask, "失败:", err) return []PlatformConfigStruct{}, err } var result response err = json.Unmarshal([]byte(resp), &result) fmt.Println("result", result) if err != nil { c_log.GlobalLogger.Error("解析【返回结果】", resp, "失败:", err) return []PlatformConfigStruct{}, err } return result.Data, nil } func checkPlatformTasks() bool { if len(PlatformTasks) == 0 { c_log.GlobalLogger.Error("数据闭环平台中该车辆今天没有配置任务。") return false } return true } // GenerateSign 生成签名 func GenerateSign(ak, sk string, params map[string]string) string { // 1. 添加时间戳 params["timestamp"] = fmt.Sprintf("%d", time.Now().Unix()) // 2. 参数排序 keys := make([]string, 0, len(params)) for k := range params { keys = append(keys, k) } sort.Strings(keys) // 3. 拼接参数 var buffer strings.Builder for _, k := range keys { buffer.WriteString(k) buffer.WriteByte('=') buffer.WriteString(params[k]) buffer.WriteByte('&') } paramStr := buffer.String()[:len(buffer.String())-1] // 4. 计算MD5 signStr := paramStr + sk hash := md5.Sum([]byte(signStr)) return hex.EncodeToString(hash[:]) }