rosbag_upload.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config"
  5. "cicv-data-closedloop/common/config/c_log"
  6. "cicv-data-closedloop/common/domain"
  7. "cicv-data-closedloop/common/entity"
  8. commonUtil "cicv-data-closedloop/common/util"
  9. "encoding/json"
  10. "fmt"
  11. "strings"
  12. "time"
  13. )
  14. // RunTimeWindowConsumerQueue 依次上传时间窗口
  15. func RunTimeWindowConsumerQueue() {
  16. //nodeName:= commonConfig.LocalConfig.Node.Name
  17. c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
  18. outLoop:
  19. for { // 串行处理
  20. // 收到自杀信号
  21. select {
  22. case signal := <-ChannelKillConsume:
  23. c_log.GlobalLogger.Info("消费者队列接收到自杀信号:", signal)
  24. if signal == 1 {
  25. ChannelKillConsume <- 1
  26. if len(entity.TimeWindowConsumerQueue) == 0 {
  27. AddKillTimes("5")
  28. return
  29. }
  30. } else { //signal == 2
  31. AddKillTimes("5")
  32. return
  33. }
  34. default:
  35. }
  36. time.Sleep(time.Duration(1) * time.Second)
  37. waitLength := len(entity.TimeWindowConsumerQueue)
  38. if waitLength == 0 {
  39. continue outLoop
  40. }
  41. // 1 获取即将处理的窗口
  42. currentTimeWindow := entity.TimeWindowConsumerQueue[0]
  43. entity.RemoveHeadOfTimeWindowConsumerQueue()
  44. c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
  45. // 2 获取目录
  46. dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime)
  47. //bags, _ := commonUtil.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
  48. //bagNumber := len(bags)
  49. //if bagNumber > currentTimeWindow.Length {
  50. // bagNumber = currentTimeWindow.Length
  51. // bags = bags[0:currentTimeWindow.Length]
  52. //}
  53. //
  54. //if commonConfig.CloudConfig.CompressBag {
  55. // // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
  56. // c_log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
  57. // for i, bag := range bags {
  58. // oldName := bag
  59. // compressCommand := []string{"compress", "--bz2", oldName}
  60. // c_log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
  61. // if _, output, err := commonUtil.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, compressCommand...); err != nil {
  62. // c_log.GlobalLogger.Errorf("compress 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
  63. // continue
  64. // }
  65. // }
  66. //}
  67. // 5 upload,必须顺序执行
  68. //c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
  69. //start := time.Now()
  70. //objectKey1 := commonConfig.OssEquBasePrefix + "data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
  71. objectKey2 := commonConfig.OssEquBasePrefix + "data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", currentTimeWindow.Length) + ".bag"
  72. objectKey3 := commonConfig.OssEquBasePrefix + "data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", currentTimeWindow.Length) + "/"
  73. //for i, bag := range bags {
  74. // startOne := time.Now()
  75. // bagSlice := strings.Split(bag, "/")
  76. // for {
  77. // {
  78. // commonConfig.OssMutex.Lock()
  79. // err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
  80. // commonConfig.OssMutex.Unlock()
  81. // if err != nil {
  82. // c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err)
  83. // continue
  84. // }
  85. // }
  86. // c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
  87. // break
  88. // }
  89. //}
  90. //c_log.GlobalLogger.Info("上传完成,总耗时:", time.Since(start))
  91. if commonConfig.LocalConfig.Node.Name == "node1" {
  92. // 在上传完成的包目录同级下添加一个目录同名的json
  93. var triggerIds []string
  94. for _, label := range currentTimeWindow.Labels {
  95. if triggerId, ok := masterConfig.LabelMapTriggerId.Load(label); !ok {
  96. c_log.GlobalLogger.Errorf("【label】=%v 没有对应的【triggerId】", label)
  97. } else {
  98. //c_log.GlobalLogger.Info("添加一个【triggerId】=", triggerId)
  99. triggerIds = append(triggerIds, commonUtil.ToString(triggerId))
  100. }
  101. }
  102. //c_log.GlobalLogger.Info("json 中添加【triggerIds】=", triggerIds)
  103. callBackJsonBytes, _ := json.MarshalIndent(&entity.CallBack{
  104. DataName: currentTimeWindow.FaultTime,
  105. DataSize: "", // 由合并程序补充
  106. EquipmentNo: commonConfig.LocalConfig.EquipmentNo,
  107. SecretKey: commonConfig.LocalConfig.SecretKey,
  108. RosBagPath: objectKey2,
  109. FilePath: objectKey3,
  110. TaskId: commonConfig.PlatformConfig.TaskConfigId,
  111. TriggerId: triggerIds,
  112. }, "", " ")
  113. callBackJson := string(callBackJsonBytes)
  114. c_log.GlobalLogger.Info("【callBackJson】=", callBackJson)
  115. {
  116. //commonConfig.OssMutex.Lock()
  117. //err := commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
  118. //commonConfig.OssMutex.Unlock()
  119. //if err != nil {
  120. // c_log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err)
  121. //}
  122. // 删除本地所有已上传的bag文件
  123. //if err := commonUtil.RemoveDir(dir); err != nil {
  124. // continue outLoop
  125. //}
  126. commonUtil.WriteStringToFile(callBackJson, dir+"callback.json")
  127. }
  128. }
  129. c_log.GlobalLogger.Infof("结束处理窗口,【Label】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
  130. c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
  131. }
  132. }