|
@@ -46,13 +46,40 @@ outLoop:
|
|
|
entity.RemoveHeadOfTimeWindowConsumerQueue()
|
|
|
c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
|
|
|
|
|
|
- // 2 获取目录
|
|
|
+ // 2 获取目录和bags
|
|
|
dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime)
|
|
|
//bags, _ := commonUtil.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
|
|
|
- //bagNumber := len(bags)
|
|
|
- //if bagNumber > currentTimeWindow.Length {
|
|
|
- // bagNumber = currentTimeWindow.Length
|
|
|
- // bags = bags[0:currentTimeWindow.Length]
|
|
|
+ // 3 如果不是全量采集,则使用 filter 命令对 bag 包进行主题过滤。
|
|
|
+ // todo 需要采集回传的话题是根据触发器设置的,触发器设置了之后进行过滤
|
|
|
+ //if commonConfig.CloudConfig.FullCollect == false {
|
|
|
+ // filterTopics := commonConfig.CollectTopics
|
|
|
+ // if nodeName == commonConfig.CloudConfig.Hosts[0].Name {
|
|
|
+ // filterTopics = currentTimeWindow.MasterTopics
|
|
|
+ // } else {
|
|
|
+ // filterTopics = currentTimeWindow.SlaveTopics
|
|
|
+ // }
|
|
|
+ // var topicsFilterSlice []string
|
|
|
+ // for _, topic := range filterTopics {
|
|
|
+ // topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'")
|
|
|
+ // }
|
|
|
+ // for i, bag := range bags {
|
|
|
+ // oldName := bag
|
|
|
+ // newName := bag + "_filter"
|
|
|
+ // filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
|
|
|
+ // _, output, err := commonUtil.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, filterCommand...)
|
|
|
+ // c_log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
|
|
|
+ // if err != nil {
|
|
|
+ // c_log.GlobalLogger.Errorf("filter 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
|
|
|
+ // continue
|
|
|
+ // }
|
|
|
+ // // 删除旧文件
|
|
|
+ // _ = commonUtil.DeleteFile(oldName)
|
|
|
+ // // 将新文件改回旧文件名
|
|
|
+ // if err = os.Rename(newName, oldName); err != nil {
|
|
|
+ // c_log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
|
|
|
+ // continue outLoop
|
|
|
+ // }
|
|
|
+ // }
|
|
|
//}
|
|
|
//
|
|
|
//if commonConfig.CloudConfig.CompressBag {
|