|
@@ -16,6 +16,7 @@ func RunTimeWindowConsumerQueue(nodeName string) {
|
|
log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
|
|
log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
|
|
outLoop:
|
|
outLoop:
|
|
for {
|
|
for {
|
|
|
|
+
|
|
// 收到自杀信号
|
|
// 收到自杀信号
|
|
select {
|
|
select {
|
|
case signal := <-ChannelKillConsume:
|
|
case signal := <-ChannelKillConsume:
|
|
@@ -31,23 +32,29 @@ outLoop:
|
|
}
|
|
}
|
|
default:
|
|
default:
|
|
}
|
|
}
|
|
|
|
+ // 每一秒扫一次
|
|
|
|
+ time.Sleep(time.Duration(1) * time.Second)
|
|
|
|
+
|
|
waitLength := len(global.TimeWindowConsumerQueue)
|
|
waitLength := len(global.TimeWindowConsumerQueue)
|
|
- if waitLength > 0 {
|
|
|
|
- log.GlobalLogger.Infof("待处理窗口个数为:%v", len(global.TimeWindowConsumerQueue))
|
|
|
|
- // 1 获取即将处理的窗口
|
|
|
|
- currentTimeWindow := global.TimeWindowConsumerQueue[0]
|
|
|
|
- util.RemoveHeaOfdTimeWindowConsumerQueue()
|
|
|
|
- log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
|
|
|
|
- // 2 获取目录
|
|
|
|
- dir := util.GetCopyDir(currentTimeWindow.FaultTime)
|
|
|
|
- bags := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
|
|
|
|
- bagNumber := len(bags)
|
|
|
|
- if bagNumber > currentTimeWindow.Length {
|
|
|
|
- bagNumber = currentTimeWindow.Length
|
|
|
|
- bags = bags[0:currentTimeWindow.Length]
|
|
|
|
- }
|
|
|
|
|
|
+ if waitLength == 0 {
|
|
|
|
+ continue outLoop
|
|
|
|
+ }
|
|
|
|
+ log.GlobalLogger.Infof("待处理窗口个数为:%v", len(global.TimeWindowConsumerQueue))
|
|
|
|
+ // 1 获取即将处理的窗口
|
|
|
|
+ currentTimeWindow := global.TimeWindowConsumerQueue[0]
|
|
|
|
+ util.RemoveHeaOfdTimeWindowConsumerQueue()
|
|
|
|
+ log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
|
|
|
|
+ // 2 获取目录
|
|
|
|
+ dir := util.GetCopyDir(currentTimeWindow.FaultTime)
|
|
|
|
+ bags := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
|
|
|
|
+ bagNumber := len(bags)
|
|
|
|
+ if bagNumber > currentTimeWindow.Length {
|
|
|
|
+ bagNumber = currentTimeWindow.Length
|
|
|
|
+ bags = bags[0:currentTimeWindow.Length]
|
|
|
|
+ }
|
|
|
|
|
|
- // 3 filter包,必须顺序执行
|
|
|
|
|
|
+ // 3 如果不是全量采集,则使用 filter 命令对 bag 包进行主题过滤。
|
|
|
|
+ if commonConfig.CloudConfig.FullCollect == false {
|
|
var filterTopics []string
|
|
var filterTopics []string
|
|
if nodeName == commonConfig.CloudConfig.Hosts[0].Name {
|
|
if nodeName == commonConfig.CloudConfig.Hosts[0].Name {
|
|
filterTopics = currentTimeWindow.MasterTopics
|
|
filterTopics = currentTimeWindow.MasterTopics
|
|
@@ -58,17 +65,16 @@ outLoop:
|
|
for _, topic := range filterTopics {
|
|
for _, topic := range filterTopics {
|
|
topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'")
|
|
topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'")
|
|
}
|
|
}
|
|
- for _, bag := range bags {
|
|
|
|
|
|
+ for i, bag := range bags {
|
|
oldName := bag
|
|
oldName := bag
|
|
newName := bag + "_filter"
|
|
newName := bag + "_filter"
|
|
filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
|
|
filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
|
|
- log.GlobalLogger.Info("执行bag包过滤命令:", filterCommand)
|
|
|
|
_, output, err := util.Execute("rosbag", filterCommand...)
|
|
_, output, err := util.Execute("rosbag", filterCommand...)
|
|
|
|
+ log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
|
|
if err != nil {
|
|
if err != nil {
|
|
log.GlobalLogger.Errorf("filter命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
|
|
log.GlobalLogger.Errorf("filter命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
- log.GlobalLogger.Info("filter bag包:", oldName)
|
|
|
|
// 删除旧文件
|
|
// 删除旧文件
|
|
util.DeleteFile(oldName)
|
|
util.DeleteFile(oldName)
|
|
// 将新文件改回旧文件名
|
|
// 将新文件改回旧文件名
|
|
@@ -77,65 +83,65 @@ outLoop:
|
|
continue outLoop
|
|
continue outLoop
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
|
|
|
|
- log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
|
|
- for _, bag := range bags {
|
|
|
|
- oldName := bag
|
|
|
|
- compressCommand := []string{"compress", "--bz2", oldName}
|
|
|
|
- if _, output, err := util.Execute("rosbag", compressCommand...); err != nil {
|
|
|
|
- log.GlobalLogger.Errorf("compress命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
|
|
+ // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
|
|
|
|
+ log.GlobalLogger.Info("压缩 bag 数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
|
|
+ for i, bag := range bags {
|
|
|
|
+ oldName := bag
|
|
|
|
+ compressCommand := []string{"compress", "--bz2", oldName}
|
|
|
|
+ log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
|
|
|
|
+ if _, output, err := util.Execute("rosbag", compressCommand...); err != nil {
|
|
|
|
+ log.GlobalLogger.Errorf("compress命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
- // 5 upload,必须顺序执行
|
|
|
|
- log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
|
|
- start := time.Now()
|
|
|
|
- objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
|
- objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
|
|
|
|
- objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
|
|
|
+ }
|
|
|
|
+ // 5 upload,必须顺序执行
|
|
|
|
+ log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
|
|
+ start := time.Now()
|
|
|
|
+ objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
|
+ objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
|
|
|
|
+ objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
|
|
|
- for i, bag := range bags {
|
|
|
|
- bagSlice := strings.Split(bag, "/")
|
|
|
|
- log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】->【", objectKey1+bagSlice[len(bagSlice)-1], "】")
|
|
|
|
- err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
|
|
|
|
- if err != nil {
|
|
|
|
- log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
|
|
|
|
- // 在上传完成的包目录同级下添加一个目录同名的json
|
|
|
|
- triggerIds := make([]string, 0)
|
|
|
|
- for _, label := range currentTimeWindow.Labels {
|
|
|
|
- triggerIds = append(triggerIds, masterConfig.LabelMapTriggerId[label])
|
|
|
|
- }
|
|
|
|
- callBackMap := map[string]interface{}{
|
|
|
|
- "dataName": currentTimeWindow.FaultTime, // 云端callback程序会将该值加8小时,因为UTC和CSV时区相差8小时
|
|
|
|
- "dataSize": "", // 由合并程序补充
|
|
|
|
- "equipmentNo": commonConfig.LocalConfig.EquipmentNo,
|
|
|
|
- "secretKey": commonConfig.LocalConfig.SecretKey,
|
|
|
|
- "rosBagPath": objectKey2,
|
|
|
|
- "filePath": objectKey3,
|
|
|
|
- "taskId": commonConfig.PlatformConfig.TaskConfigId,
|
|
|
|
- "triggerId": triggerIds,
|
|
|
|
- }
|
|
|
|
- callBackJson, err := util.MapToJsonString(callBackMap)
|
|
|
|
|
|
+ for i, bag := range bags {
|
|
|
|
+ bagSlice := strings.Split(bag, "/")
|
|
|
|
+ log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
|
|
|
|
+ err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
|
|
if err != nil {
|
|
if err != nil {
|
|
- log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
|
|
|
|
- }
|
|
|
|
- err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
|
|
|
|
- if err != nil {
|
|
|
|
- log.GlobalLogger.Error("上传callback.json", callBackJson, "失败:", err)
|
|
|
|
|
|
+ log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
+ log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
|
|
|
|
+ // 在上传完成的包目录同级下添加一个目录同名的json
|
|
|
|
+ triggerIds := make([]string, 0)
|
|
|
|
+ for _, label := range currentTimeWindow.Labels {
|
|
|
|
+ triggerIds = append(triggerIds, masterConfig.LabelMapTriggerId[label])
|
|
|
|
+ }
|
|
|
|
+ callBackMap := map[string]interface{}{
|
|
|
|
+ "dataName": currentTimeWindow.FaultTime, // 云端callback程序会将该值加8小时,因为UTC和CSV时区相差8小时
|
|
|
|
+ "dataSize": "", // 由合并程序补充
|
|
|
|
+ "equipmentNo": commonConfig.LocalConfig.EquipmentNo,
|
|
|
|
+ "secretKey": commonConfig.LocalConfig.SecretKey,
|
|
|
|
+ "rosBagPath": objectKey2,
|
|
|
|
+ "filePath": objectKey3,
|
|
|
|
+ "taskId": commonConfig.PlatformConfig.TaskConfigId,
|
|
|
|
+ "triggerId": triggerIds,
|
|
|
|
+ }
|
|
|
|
+ callBackJson, err := util.MapToJsonString(callBackMap)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
|
|
|
|
+ }
|
|
|
|
+ err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err)
|
|
|
|
+ }
|
|
|
|
|
|
- // 删除本地所有已上传的bag文件
|
|
|
|
- log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
|
|
|
|
- if err = util.RemoveDir(dir); err != nil {
|
|
|
|
- continue outLoop
|
|
|
|
- }
|
|
|
|
|
|
+ // 删除本地所有已上传的bag文件
|
|
|
|
+ log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
|
|
|
|
+ if err = util.RemoveDir(dir); err != nil {
|
|
|
|
+ continue outLoop
|
|
}
|
|
}
|
|
- // 每一秒扫一次
|
|
|
|
- time.Sleep(time.Duration(1) * time.Second)
|
|
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|