rosbag_upload.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package svc
  2. import (
  3. commonConfig "cicv-data-closedloop/kinglong/common/cfg"
  4. "cicv-data-closedloop/kinglong/common/global"
  5. "cicv-data-closedloop/kinglong/common/log"
  6. "cicv-data-closedloop/kinglong/common/util"
  7. masterConfig "cicv-data-closedloop/kinglong/master/pkg/cfg"
  8. "fmt"
  9. "os"
  10. "strings"
  11. "time"
  12. )
  13. func RunTimeWindowConsumerQueue(nodeName string) {
  14. log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
  15. outLoop:
  16. for { // 串行处理
  17. // 收到自杀信号
  18. select {
  19. case signal := <-ChannelKillConsume:
  20. if signal == 1 {
  21. ChannelKillConsume <- 1
  22. if len(global.TimeWindowConsumerQueue) == 0 {
  23. AddKillTimes("5")
  24. return
  25. }
  26. } else { //signal == 2
  27. AddKillTimes("5")
  28. return
  29. }
  30. default:
  31. }
  32. // 每一秒扫一次
  33. time.Sleep(time.Duration(1) * time.Second)
  34. waitLength := len(global.TimeWindowConsumerQueue)
  35. if waitLength == 0 {
  36. continue outLoop
  37. }
  38. log.GlobalLogger.Infof("待处理窗口个数为:%v", len(global.TimeWindowConsumerQueue))
  39. // 1 获取即将处理的窗口
  40. currentTimeWindow := global.TimeWindowConsumerQueue[0]
  41. util.RemoveHeaOfdTimeWindowConsumerQueue()
  42. log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
  43. // 2 获取目录
  44. dir := util.GetCopyDir(currentTimeWindow.FaultTime)
  45. bags := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
  46. bagNumber := len(bags)
  47. if bagNumber > currentTimeWindow.Length {
  48. bagNumber = currentTimeWindow.Length
  49. bags = bags[0:currentTimeWindow.Length]
  50. }
  51. // 3 如果不是全量采集,则使用 filter 命令对 bag 包进行主题过滤。
  52. if commonConfig.CloudConfig.FullCollect == false {
  53. var filterTopics []string
  54. if nodeName == commonConfig.CloudConfig.Hosts[0].Name {
  55. filterTopics = currentTimeWindow.MasterTopics
  56. } else {
  57. filterTopics = currentTimeWindow.SlaveTopics
  58. }
  59. var topicsFilterSlice []string
  60. for _, topic := range filterTopics {
  61. topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'")
  62. }
  63. for i, bag := range bags {
  64. oldName := bag
  65. newName := bag + "_filter"
  66. filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
  67. _, output, err := util.Execute("rosbag", filterCommand...)
  68. log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
  69. if err != nil {
  70. log.GlobalLogger.Errorf("filter 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
  71. continue
  72. }
  73. // 删除旧文件
  74. util.DeleteFile(oldName)
  75. // 将新文件改回旧文件名
  76. if err = os.Rename(newName, oldName); err != nil {
  77. log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
  78. continue outLoop
  79. }
  80. }
  81. }
  82. // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
  83. log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
  84. for i, bag := range bags {
  85. oldName := bag
  86. compressCommand := []string{"compress", "--bz2", oldName}
  87. log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
  88. if _, output, err := util.Execute("rosbag", compressCommand...); err != nil {
  89. log.GlobalLogger.Errorf("compress 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
  90. continue
  91. }
  92. }
  93. // 5 upload,必须顺序执行
  94. log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
  95. start := time.Now()
  96. objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
  97. objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
  98. objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
  99. for i, bag := range bags {
  100. bagSlice := strings.Split(bag, "/")
  101. log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
  102. err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
  103. if err != nil {
  104. log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
  105. continue
  106. }
  107. }
  108. log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
  109. // 在上传完成的包目录同级下添加一个目录同名的json
  110. triggerIds := make([]string, 0)
  111. for _, label := range currentTimeWindow.Labels {
  112. triggerIdToAppend := masterConfig.LabelMapTriggerId[label]
  113. log.GlobalLogger.Info("添加一个【triggerId】=", triggerIdToAppend)
  114. triggerIds = append(triggerIds, triggerIdToAppend)
  115. }
  116. log.GlobalLogger.Info("json 中添加【triggerIds】=", triggerIds)
  117. callBackMap := map[string]interface{}{
  118. "dataName": currentTimeWindow.FaultTime,
  119. "dataSize": "", // 由合并程序补充
  120. "equipmentNo": commonConfig.LocalConfig.EquipmentNo,
  121. "secretKey": commonConfig.LocalConfig.SecretKey,
  122. "rosBagPath": objectKey2,
  123. "filePath": objectKey3,
  124. "taskId": commonConfig.PlatformConfig.TaskConfigId,
  125. "triggerId": triggerIds,
  126. }
  127. callBackJson, err := util.MapToJsonString(callBackMap)
  128. log.GlobalLogger.Info("【callBackJson】=", callBackJson)
  129. if err != nil {
  130. log.GlobalLogger.Error("callBackMap", callBackMap, "转 json 失败:", err)
  131. }
  132. err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
  133. if err != nil {
  134. log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err)
  135. }
  136. // 删除本地所有已上传的bag文件
  137. log.GlobalLogger.Infof("结束处理窗口,【Label】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
  138. if err = util.RemoveDir(dir); err != nil {
  139. continue outLoop
  140. }
  141. }
  142. }