package service import ( commonConfig "cicv-data-closedloop/aarch64/pjibot_guide/common/config" masterConfig "cicv-data-closedloop/aarch64/pjibot_guide/master/package/config" "cicv-data-closedloop/common/config/c_log" "cicv-data-closedloop/common/domain" "cicv-data-closedloop/common/entity" "cicv-data-closedloop/common/util" "fmt" "os" "path/filepath" "strings" "time" ) func RunTimeWindowConsumerQueue(nodeName string) { c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动") outLoop: for { // 收到自杀信号 select { case signal := <-ChannelKillConsume: if signal == 1 { ChannelKillConsume <- 1 if len(entity.TimeWindowConsumerQueue) == 0 { AddKillTimes("5") return } } else { //signal == 2 AddKillTimes("5") return } default: } // 每一秒扫一次 time.Sleep(time.Duration(1) * time.Second) waitLength := len(entity.TimeWindowConsumerQueue) if waitLength == 0 { continue outLoop } // 1 获取即将处理的窗口 currentTimeWindow := entity.TimeWindowConsumerQueue[0] entity.RemoveHeadOfTimeWindowConsumerQueue() c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length) // 2 获取目录 dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime) bags, _ := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag") bagNumber := len(bags) if bagNumber > currentTimeWindow.Length { bagNumber = currentTimeWindow.Length bags = bags[0:currentTimeWindow.Length] } // 3 如果不是全量采集,则使用 filter 命令对 bag 包进行主题过滤。 if commonConfig.CloudConfig.FullCollect == false { var filterTopics []string if nodeName == commonConfig.CloudConfig.Hosts[0].Name { filterTopics = currentTimeWindow.MasterTopics } else { filterTopics = currentTimeWindow.SlaveTopics } var topicsFilterSlice []string for _, topic := range filterTopics { topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'") } for i, bag := range bags { oldName := bag newName := bag + "_filter" filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""} _, output, err := util.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, filterCommand...) c_log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。") if err != nil { c_log.GlobalLogger.Errorf("filter命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err) continue } // 删除旧文件 util.DeleteFile(oldName) // 将新文件改回旧文件名 if err = os.Rename(newName, oldName); err != nil { c_log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err) continue outLoop } } } // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag // 5 todo 机器人去掉压缩过程,防止cpu跑满 //c_log.GlobalLogger.Info("压缩 bag 数据包,故障时间为:", currentTimeWindow.FaultTime) //for i, bag := range bags { // oldName := bag // compressCommand := []string{"compress", "--bz2", oldName} // c_log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。") // if _, output, err := util.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, compressCommand...); err != nil { // c_log.GlobalLogger.Errorf("compress命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err) // continue // } //} // 5 upload,必须顺序执行 c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime) start := time.Now() objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/" objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag" objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/" for i, bag := range bags { startOne := time.Now() bagSlice := strings.Split(bag, "/") for { commonConfig.OssMutex.Lock() err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag) commonConfig.OssMutex.Unlock() if err != nil { c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err) continue } c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】") break } } c_log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start)) // 在上传完成的包目录同级下添加一个目录同名的json triggerIds := make([]string, 0) for _, label := range currentTimeWindow.Labels { if value, ok := masterConfig.LabelMapTriggerId.Load(label); ok { triggerIds = append(triggerIds, value.(string)) } } callBackMap := map[string]interface{}{ "dataName": currentTimeWindow.FaultTime, // 云端callback程序会将该值加8小时,因为UTC和CSV时区相差8小时 "dataSize": "", // 由合并程序补充 "equipmentNo": commonConfig.LocalConfig.EquipmentNo, "secretKey": commonConfig.LocalConfig.SecretKey, "rosBagPath": objectKey2, "filePath": objectKey3, "taskId": commonConfig.PlatformConfig.TaskConfigId, "triggerId": triggerIds, } callBackJson, err := util.MapToJsonString(callBackMap) if err != nil { c_log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err) } commonConfig.OssMutex.Lock() // 上传callback.json err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson)) if err != nil { c_log.GlobalLogger.Error("上传 callback.json 文件失败:", err) } // 额外采集mapBuf for _, file := range commonConfig.CloudConfig.MapBufFiles { err = commonConfig.OssBucket.PutObjectFromFile(objectKey3+filepath.Base(file), file) if err != nil { c_log.GlobalLogger.Error("上传 mapBuf 文件失败:", err) } } // 压缩采集data目录 { err = util.ZipDir(commonConfig.CloudConfig.DataDir.Src, commonConfig.CloudConfig.DataDir.Dest, commonConfig.CloudConfig.DataDir.SrcSub) if err != nil { c_log.GlobalLogger.Error("压缩data目录失败:", err) } else { c_log.GlobalLogger.Infof("压缩data目录【%v】->【%v】成功", commonConfig.CloudConfig.DataDir.Src, commonConfig.CloudConfig.DataDir.Dest) } dataZipKey := objectKey3 + "data.zip" err = commonConfig.OssBucket.PutObjectFromFile(dataZipKey, commonConfig.CloudConfig.DataDir.Dest) if err != nil { c_log.GlobalLogger.Error("上传 data 目录压缩文件失败:", err) } else { c_log.GlobalLogger.Infof("上传data目录压缩包【%v】->【%v】成功", commonConfig.CloudConfig.DataDir.Dest, dataZipKey) } commonConfig.OssMutex.Unlock() } // todo 不压缩采集data目录 { //var filePaths []string // 初始化一个切片来保存文件路径 //err = filepath.WalkDir(commonConfig.CloudConfig.DataDir.Src, func(path string, d fs.DirEntry, err error) error { // 使用filepath.WalkDir遍历目录 // if err != nil { // return err // 如果有错误,返回错误 // } // // // 检查是否为文件(跳过目录) // if !d.IsDir() { // filePaths = append(filePaths, path) // 将文件路径添加到切片中 // } // return nil //}) //if err != nil { // c_log.GlobalLogger.Error("扫描 data 目录失败:", err) // goto outLoop //} // //// 不压缩上传所有文件 //for _, path := range filePaths { // if strings.Contains(path, commonConfig.CloudConfig.DataDir.Exclude) { // continue // } // relativePath := strings.Replace(path, commonConfig.CloudConfig.DataDir.Src, "", 1) // ossKey := objectKey3 + "data/" + relativePath // err = commonConfig.OssBucket.PutObjectFromFile(ossKey, path) // if err != nil { // c_log.GlobalLogger.Errorf("上传 data 目录内文件【%v】->【%v】失败:%v", path, ossKey, err) // goto outLoop // } //} //commonConfig.OssMutex.Unlock() } // 删除本地所有已上传的bag文件 c_log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length) c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue)) if err = util.RemoveDir(dir); err != nil { goto outLoop } if len(entity.TimeWindowConsumerQueue) == 0 { c_log.GlobalLogger.Infof("已处理所有窗口,重启 record 命令。") ChannelKillRosRecord <- 2 } } }