|
@@ -9,13 +9,14 @@ import (
|
|
commonUtil "cicv-data-closedloop/common/util"
|
|
commonUtil "cicv-data-closedloop/common/util"
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"fmt"
|
|
"fmt"
|
|
- "os"
|
|
|
|
"strings"
|
|
"strings"
|
|
"time"
|
|
"time"
|
|
)
|
|
)
|
|
|
|
|
|
// RunTimeWindowConsumerQueue 依次上传时间窗口
|
|
// RunTimeWindowConsumerQueue 依次上传时间窗口
|
|
-func RunTimeWindowConsumerQueue(nodeName string) {
|
|
|
|
|
|
+func RunTimeWindowConsumerQueue() {
|
|
|
|
+
|
|
|
|
+ //nodeName:= commonConfig.LocalConfig.Node.Name
|
|
c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
|
|
c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
|
|
outLoop:
|
|
outLoop:
|
|
for { // 串行处理
|
|
for { // 串行处理
|
|
@@ -48,83 +49,51 @@ outLoop:
|
|
|
|
|
|
// 2 获取目录
|
|
// 2 获取目录
|
|
dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime)
|
|
dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime)
|
|
- bags, _ := commonUtil.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
|
|
|
|
- bagNumber := len(bags)
|
|
|
|
- if bagNumber > currentTimeWindow.Length {
|
|
|
|
- bagNumber = currentTimeWindow.Length
|
|
|
|
- bags = bags[0:currentTimeWindow.Length]
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // 3 如果不是全量采集,则使用 filter 命令对 bag 包进行主题过滤。
|
|
|
|
- if commonConfig.CloudConfig.FullCollect == false {
|
|
|
|
- var filterTopics []string
|
|
|
|
- if nodeName == commonConfig.CloudConfig.Hosts[0].Name {
|
|
|
|
- filterTopics = currentTimeWindow.MasterTopics
|
|
|
|
- } else {
|
|
|
|
- filterTopics = currentTimeWindow.SlaveTopics
|
|
|
|
- }
|
|
|
|
- var topicsFilterSlice []string
|
|
|
|
- for _, topic := range filterTopics {
|
|
|
|
- topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'")
|
|
|
|
- }
|
|
|
|
- for i, bag := range bags {
|
|
|
|
- oldName := bag
|
|
|
|
- newName := bag + "_filter"
|
|
|
|
- filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
|
|
|
|
- _, output, err := commonUtil.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, filterCommand...)
|
|
|
|
- c_log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
|
|
|
|
- if err != nil {
|
|
|
|
- c_log.GlobalLogger.Errorf("filter 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- // 删除旧文件
|
|
|
|
- _ = commonUtil.DeleteFile(oldName)
|
|
|
|
- // 将新文件改回旧文件名
|
|
|
|
- if err = os.Rename(newName, oldName); err != nil {
|
|
|
|
- c_log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
|
|
|
|
- continue outLoop
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if commonConfig.CloudConfig.CompressBag == true {
|
|
|
|
- // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
|
|
|
|
- c_log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
|
|
- for i, bag := range bags {
|
|
|
|
- oldName := bag
|
|
|
|
- compressCommand := []string{"compress", "--bz2", oldName}
|
|
|
|
- c_log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
|
|
|
|
- if _, output, err := commonUtil.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, compressCommand...); err != nil {
|
|
|
|
- c_log.GlobalLogger.Errorf("compress 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ //bags, _ := commonUtil.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
|
|
|
|
+ //bagNumber := len(bags)
|
|
|
|
+ //if bagNumber > currentTimeWindow.Length {
|
|
|
|
+ // bagNumber = currentTimeWindow.Length
|
|
|
|
+ // bags = bags[0:currentTimeWindow.Length]
|
|
|
|
+ //}
|
|
|
|
+ //
|
|
|
|
+ //if commonConfig.CloudConfig.CompressBag {
|
|
|
|
+ // // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
|
|
|
|
+ // c_log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
|
|
+ // for i, bag := range bags {
|
|
|
|
+ // oldName := bag
|
|
|
|
+ // compressCommand := []string{"compress", "--bz2", oldName}
|
|
|
|
+ // c_log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
|
|
|
|
+ // if _, output, err := commonUtil.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, compressCommand...); err != nil {
|
|
|
|
+ // c_log.GlobalLogger.Errorf("compress 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
|
|
|
|
+ // continue
|
|
|
|
+ // }
|
|
|
|
+ // }
|
|
|
|
+ //}
|
|
|
|
|
|
// 5 upload,必须顺序执行
|
|
// 5 upload,必须顺序执行
|
|
c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
- start := time.Now()
|
|
|
|
- objectKey1 := commonConfig.OssEquBasePrefix + "data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
|
- objectKey2 := commonConfig.OssEquBasePrefix + "data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
|
|
|
|
- objectKey3 := commonConfig.OssEquBasePrefix + "data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
|
- for i, bag := range bags {
|
|
|
|
- startOne := time.Now()
|
|
|
|
- bagSlice := strings.Split(bag, "/")
|
|
|
|
- for {
|
|
|
|
- {
|
|
|
|
- //commonConfig.OssMutex.Lock()
|
|
|
|
- //err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
|
|
|
|
- //commonConfig.OssMutex.Unlock()
|
|
|
|
- //if err != nil {
|
|
|
|
- // c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err)
|
|
|
|
- // continue
|
|
|
|
- //}
|
|
|
|
- }
|
|
|
|
- c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
|
|
|
|
- break
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- c_log.GlobalLogger.Info("上传完成,总耗时:", time.Since(start))
|
|
|
|
|
|
+ //start := time.Now()
|
|
|
|
+ //objectKey1 := commonConfig.OssEquBasePrefix + "data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
|
+ objectKey2 := commonConfig.OssEquBasePrefix + "data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", currentTimeWindow.Length) + ".bag"
|
|
|
|
+ objectKey3 := commonConfig.OssEquBasePrefix + "data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", currentTimeWindow.Length) + "/"
|
|
|
|
+ //for i, bag := range bags {
|
|
|
|
+ // startOne := time.Now()
|
|
|
|
+ // bagSlice := strings.Split(bag, "/")
|
|
|
|
+ // for {
|
|
|
|
+ // {
|
|
|
|
+ // commonConfig.OssMutex.Lock()
|
|
|
|
+ // err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
|
|
|
|
+ // commonConfig.OssMutex.Unlock()
|
|
|
|
+ // if err != nil {
|
|
|
|
+ // c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err)
|
|
|
|
+ // continue
|
|
|
|
+ // }
|
|
|
|
+ // }
|
|
|
|
+ // c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
|
|
|
|
+ // break
|
|
|
|
+ // }
|
|
|
|
+ //}
|
|
|
|
+ //c_log.GlobalLogger.Info("上传完成,总耗时:", time.Since(start))
|
|
if commonConfig.LocalConfig.Node.Name == "node1" {
|
|
if commonConfig.LocalConfig.Node.Name == "node1" {
|
|
// 在上传完成的包目录同级下添加一个目录同名的json
|
|
// 在上传完成的包目录同级下添加一个目录同名的json
|
|
var triggerIds []string
|
|
var triggerIds []string
|