|
@@ -31,7 +31,10 @@ outLoop:
|
|
}
|
|
}
|
|
default:
|
|
default:
|
|
}
|
|
}
|
|
- if len(global.TimeWindowConsumerQueue) > 0 {
|
|
|
|
|
|
+
|
|
|
|
+ waitLength := len(global.TimeWindowConsumerQueue)
|
|
|
|
+ log.GlobalLogger.Infof("待处理窗口个数为:%v", len(global.TimeWindowConsumerQueue))
|
|
|
|
+ if waitLength > 0 {
|
|
// 1 获取即将处理的窗口
|
|
// 1 获取即将处理的窗口
|
|
currentTimeWindow := global.TimeWindowConsumerQueue[0]
|
|
currentTimeWindow := global.TimeWindowConsumerQueue[0]
|
|
util.RemoveHeaOfdTimeWindowConsumerQueue()
|
|
util.RemoveHeaOfdTimeWindowConsumerQueue()
|
|
@@ -60,19 +63,18 @@ outLoop:
|
|
for _, bag := range bags {
|
|
for _, bag := range bags {
|
|
oldName := bag
|
|
oldName := bag
|
|
newName := bag + "_filter"
|
|
newName := bag + "_filter"
|
|
- var command []string
|
|
|
|
- command = append(command, "filter")
|
|
|
|
- command = append(command, oldName)
|
|
|
|
- command = append(command, newName)
|
|
|
|
- command = append(command, "\""+strings.Join(topicsFilterSlice, " or ")+"\"")
|
|
|
|
- log.GlobalLogger.Info("执行bag包过滤命令:", command)
|
|
|
|
- //util2.Execute("rosbag", command...)
|
|
|
|
|
|
+ filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
|
|
|
|
+ log.GlobalLogger.Info("执行bag包过滤命令:", filterCommand)
|
|
|
|
+ _, output, err := util.Execute("rosbag", filterCommand...)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.GlobalLogger.Errorf("filter命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
log.GlobalLogger.Info("filter bag包:", oldName)
|
|
log.GlobalLogger.Info("filter bag包:", oldName)
|
|
// 删除旧文件
|
|
// 删除旧文件
|
|
util.DeleteFile(oldName)
|
|
util.DeleteFile(oldName)
|
|
// 将新文件改回旧文件名
|
|
// 将新文件改回旧文件名
|
|
- err := os.Rename(newName, oldName)
|
|
|
|
- if err != nil {
|
|
|
|
|
|
+ if err = os.Rename(newName, oldName); err != nil {
|
|
log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
|
|
log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
|
|
continue outLoop
|
|
continue outLoop
|
|
}
|
|
}
|
|
@@ -82,7 +84,11 @@ outLoop:
|
|
log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
for _, bag := range bags {
|
|
for _, bag := range bags {
|
|
oldName := bag
|
|
oldName := bag
|
|
- util.Execute("rosbag", "compress", "--bz2", oldName)
|
|
|
|
|
|
+ compressCommand := []string{"compress", "--bz2", oldName}
|
|
|
|
+ if _, output, err := util.Execute("rosbag", compressCommand...); err != nil {
|
|
|
|
+ log.GlobalLogger.Errorf("compress命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
}
|
|
}
|
|
// 5 upload,必须顺序执行
|
|
// 5 upload,必须顺序执行
|
|
log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
@@ -127,8 +133,10 @@ outLoop:
|
|
}
|
|
}
|
|
|
|
|
|
// 删除本地所有已上传的bag文件
|
|
// 删除本地所有已上传的bag文件
|
|
- util.RemoveDir(dir)
|
|
|
|
log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
|
|
log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
|
|
|
|
+ if err = util.RemoveDir(dir); err != nil {
|
|
|
|
+ continue outLoop
|
|
|
|
+ }
|
|
}
|
|
}
|
|
// 每一秒扫一次
|
|
// 每一秒扫一次
|
|
time.Sleep(time.Duration(1) * time.Second)
|
|
time.Sleep(time.Duration(1) * time.Second)
|