|
@@ -16,7 +16,7 @@ import (
|
|
|
// RunTimeWindowConsumerQueue 依次上传时间窗口
|
|
|
func RunTimeWindowConsumerQueue() {
|
|
|
|
|
|
- //nodeName:= commonConfig.LocalConfig.Node.Name
|
|
|
+ nodeName := commonConfig.LocalConfig.Node.Name
|
|
|
c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
|
|
|
outLoop:
|
|
|
for { // 串行处理
|
|
@@ -48,7 +48,13 @@ outLoop:
|
|
|
|
|
|
// 2 获取目录和bags
|
|
|
dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime)
|
|
|
- //bags, _ := commonUtil.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
|
|
|
+ bags, _ := commonUtil.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
|
|
|
+ bagNumber := len(bags)
|
|
|
+ if bagNumber > currentTimeWindow.Length {
|
|
|
+ bagNumber = currentTimeWindow.Length
|
|
|
+ bags = bags[0:currentTimeWindow.Length]
|
|
|
+ }
|
|
|
+
|
|
|
// 3 如果不是全量采集,则使用 filter 命令对 bag 包进行主题过滤。
|
|
|
// todo 需要采集回传的话题是根据触发器设置的,触发器设置了之后进行过滤
|
|
|
//if commonConfig.CloudConfig.FullCollect == false {
|
|
@@ -97,29 +103,29 @@ outLoop:
|
|
|
//}
|
|
|
|
|
|
// 5 upload,必须顺序执行
|
|
|
- //c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
|
- //start := time.Now()
|
|
|
- //objectKey1 := commonConfig.OssEquBasePrefix + "data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
+ c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
|
+ start := time.Now()
|
|
|
+ objectKey1 := commonConfig.OssEquBasePrefix + "data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
objectKey2 := commonConfig.OssEquBasePrefix + "data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", currentTimeWindow.Length) + ".bag"
|
|
|
objectKey3 := commonConfig.OssEquBasePrefix + "data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", currentTimeWindow.Length) + "/"
|
|
|
- //for i, bag := range bags {
|
|
|
- // startOne := time.Now()
|
|
|
- // bagSlice := strings.Split(bag, "/")
|
|
|
- // for {
|
|
|
- // {
|
|
|
- // commonConfig.OssMutex.Lock()
|
|
|
- // err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
|
|
|
- // commonConfig.OssMutex.Unlock()
|
|
|
- // if err != nil {
|
|
|
- // c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err)
|
|
|
- // continue
|
|
|
- // }
|
|
|
- // }
|
|
|
- // c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
|
|
|
- // break
|
|
|
- // }
|
|
|
- //}
|
|
|
- //c_log.GlobalLogger.Info("上传完成,总耗时:", time.Since(start))
|
|
|
+ for i, bag := range bags {
|
|
|
+ startOne := time.Now()
|
|
|
+ bagSlice := strings.Split(bag, "/")
|
|
|
+ for {
|
|
|
+ {
|
|
|
+ commonConfig.OssMutex.Lock()
|
|
|
+ err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
|
|
|
+ commonConfig.OssMutex.Unlock()
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ c_log.GlobalLogger.Info("上传完成,总耗时:", time.Since(start))
|
|
|
if commonConfig.LocalConfig.Node.Name == "node1" {
|
|
|
// 在上传完成的包目录同级下添加一个目录同名的json
|
|
|
var triggerIds []string
|