package svc import ( commonConfig "cicv-data-closedloop/pji/common/cfg" "cicv-data-closedloop/pji/common/global" "cicv-data-closedloop/pji/common/log" "cicv-data-closedloop/pji/common/util" masterConfig "cicv-data-closedloop/pji/master/pkg/cfg" "fmt" "os" "strings" "time" ) func RunTimeWindowConsumerQueue(nodeName string) { log.GlobalLogger.Info("处理消费者队列goroutine - 启动") outLoop: for { // 收到自杀信号 signal := <-ChannelKillConsume if signal == 1 { ChannelKillConsume <- 1 if len(global.TimeWindowConsumerQueue) == 0 { AddKillTimes("5") return } } else { //signal == 2 AddKillTimes("5") return } if len(global.TimeWindowConsumerQueue) > 0 { // 1 获取即将处理的窗口 currentTimeWindow := global.TimeWindowConsumerQueue[0] util.RemoveHeaOfdTimeWindowConsumerQueue() log.GlobalLogger.Info("即将消费窗口:", currentTimeWindow) // 2 获取目录 dir := util.GetCopyDir(currentTimeWindow.FaultTime) bags := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag") bagNumber := len(bags) if bagNumber > currentTimeWindow.Length { bagNumber = currentTimeWindow.Length bags = bags[0 : currentTimeWindow.Length-1] } // 3 filter包,必须顺序执行 var filterTopics []string if nodeName == commonConfig.CloudConfig.Hosts[0].Name { filterTopics = currentTimeWindow.MasterTopics } else { filterTopics = currentTimeWindow.SlaveTopics } var topicsFilterSlice []string for _, topic := range filterTopics { topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'") } for _, bag := range bags { oldName := bag newName := bag + "_filter" var command []string command = append(command, "filter") command = append(command, oldName) command = append(command, newName) command = append(command, "\""+strings.Join(topicsFilterSlice, " or ")+"\"") //log.GlobalLogger.Info("执行bag包过滤命令:", command) log.GlobalLogger.Info("filter bag包:", oldName) util.Execute("rosbag", command...) // 删除旧文件 util.DeleteFile(oldName) // 将新文件改回旧文件名 err := os.Rename(newName, oldName) if err != nil { log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err) continue outLoop } } // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime) for _, bag := range bags { oldName := bag util.Execute("rosbag", "compress", "--bz2", oldName) } // 5 upload,必须顺序执行 log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime) start := time.Now() objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/" objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag" objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/" for i, bag := range bags { bagSlice := strings.Split(bag, "/") log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】->【", objectKey1+bagSlice[len(bagSlice)-1], "】") err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag) if err != nil { log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err) continue } } log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start)) // 在上传完成的包目录同级下添加一个目录同名的json triggerIds := make([]string, 0) for _, label := range currentTimeWindow.Labels { triggerIds = append(triggerIds, masterConfig.LabelMapTriggerId[label]) } callBackMap := map[string]interface{}{ "dataName": currentTimeWindow.FaultTime, // 云端callback程序会将该值加8小时,因为UTC和CSV时区相差8小时 "dataSize": "", // 由合并程序补充 "equipmentNo": commonConfig.LocalConfig.EquipmentNo, "secretKey": commonConfig.LocalConfig.SecretKey, "rosBagPath": objectKey2, "filePath": objectKey3, "taskId": commonConfig.PlatformConfig.TaskConfigId, "triggerId": triggerIds, } callBackJson, err := util.MapToJsonString(callBackMap) if err != nil { log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err) } err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson)) if err != nil { log.GlobalLogger.Error("上传callback.json", callBackJson, "失败:", err) } // 删除本地所有已上传的bag文件 util.RemoveDir(dir) log.GlobalLogger.Info(" -------- 处理窗口:", currentTimeWindow.FaultTime, " - 结束 -------") } // 每一秒扫一次 time.Sleep(time.Duration(1) * time.Second) } }