rosbag_upload.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package svc
  2. import (
  3. "fmt"
  4. cfg2 "main/kinglong/common/cfg"
  5. "main/kinglong/common/global"
  6. "main/kinglong/common/log"
  7. "main/kinglong/common/util"
  8. masterCfg "main/kinglong/master/pkg/cfg"
  9. "os"
  10. "strings"
  11. "time"
  12. )
  13. func RunTimeWindowConsumerQueue(nodeName string) {
  14. log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
  15. outLoop:
  16. for { // 串行处理
  17. // 收到自杀信号
  18. signal := <-ChannelKillConsume
  19. if signal == 1 {
  20. ChannelKillConsume <- 1
  21. if len(global.TimeWindowConsumerQueue) == 0 {
  22. AddKillTimes("5")
  23. return
  24. }
  25. } else { //signal == 2
  26. AddKillTimes("5")
  27. return
  28. }
  29. if len(global.TimeWindowConsumerQueue) > 0 {
  30. // 1 获取即将处理的窗口
  31. currentTimeWindow := global.TimeWindowConsumerQueue[0]
  32. util.RemoveHeaOfdTimeWindowConsumerQueue()
  33. log.GlobalLogger.Info("处理窗口:", currentTimeWindow.FaultTime, " - 开始")
  34. log.GlobalLogger.Info("【FaultTime】=", currentTimeWindow.FaultTime)
  35. log.GlobalLogger.Info("【Labels】=", currentTimeWindow.Labels)
  36. log.GlobalLogger.Info("【TriggerIds】=", currentTimeWindow.TriggerIds)
  37. // 2 获取目录
  38. dir := util.GetCopyDir(currentTimeWindow.FaultTime)
  39. bags := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
  40. bagNumber := len(bags)
  41. if bagNumber > currentTimeWindow.Length {
  42. bagNumber = currentTimeWindow.Length
  43. bags = bags[0 : currentTimeWindow.Length-1]
  44. }
  45. // 3 filter包,必须顺序执行
  46. var filterTopics []string
  47. if nodeName == cfg2.CloudConfig.Hosts[0].Name {
  48. filterTopics = currentTimeWindow.MasterTopics
  49. } else {
  50. filterTopics = currentTimeWindow.SlaveTopics
  51. }
  52. var topicsFilterSlice []string
  53. for _, topic := range filterTopics {
  54. topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'")
  55. }
  56. for _, bag := range bags {
  57. oldName := bag
  58. newName := bag + "_filter"
  59. var command []string
  60. command = append(command, "filter")
  61. command = append(command, oldName)
  62. command = append(command, newName)
  63. command = append(command, "\""+strings.Join(topicsFilterSlice, " or ")+"\"")
  64. log.GlobalLogger.Info("执行bag包过滤命令:", command)
  65. //util2.Execute("rosbag", command...)
  66. log.GlobalLogger.Info("filter bag包:", oldName)
  67. // 删除旧文件
  68. util.DeleteFile(oldName)
  69. // 将新文件改回旧文件名
  70. err := os.Rename(newName, oldName)
  71. if err != nil {
  72. log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
  73. continue outLoop
  74. }
  75. }
  76. // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
  77. log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
  78. for _, bag := range bags {
  79. oldName := bag
  80. util.Execute("rosbag", "compress", "--bz2", oldName)
  81. }
  82. // 5 upload,必须顺序执行
  83. log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
  84. start := time.Now()
  85. objectKey1 := cfg2.LocalConfig.OssBasePrefix + cfg2.LocalConfig.EquipmentNo + "/data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
  86. objectKey2 := cfg2.LocalConfig.OssBasePrefix + cfg2.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
  87. objectKey3 := cfg2.LocalConfig.OssBasePrefix + cfg2.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
  88. for i, bag := range bags {
  89. bagSlice := strings.Split(bag, "/")
  90. log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】->【", objectKey1+bagSlice[len(bagSlice)-1], "】")
  91. err := cfg2.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
  92. if err != nil {
  93. log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
  94. continue
  95. }
  96. }
  97. log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
  98. // 在上传完成的包目录同级下添加一个目录同名的json
  99. triggerIds := make([]string, 0)
  100. for _, label := range currentTimeWindow.Labels {
  101. triggerIdToAppend := masterCfg.LabelMapTriggerId[label]
  102. log.GlobalLogger.Info("添加一个【triggerId】=", triggerIdToAppend)
  103. triggerIds = append(triggerIds, triggerIdToAppend)
  104. }
  105. callBackMap := map[string]interface{}{
  106. "dataName": currentTimeWindow.FaultTime,
  107. "dataSize": "", // 由合并程序补充
  108. "equipmentNo": cfg2.LocalConfig.EquipmentNo,
  109. "secretKey": cfg2.LocalConfig.SecretKey,
  110. "rosBagPath": objectKey2,
  111. "filePath": objectKey3,
  112. "taskId": cfg2.PlatformConfig.TaskConfigId,
  113. "triggerId": triggerIds,
  114. }
  115. callBackJson, err := util.MapToJsonString(callBackMap)
  116. if err != nil {
  117. log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
  118. }
  119. err = cfg2.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
  120. if err != nil {
  121. log.GlobalLogger.Error("上传callback.json", callBackJson, "失败:", err)
  122. }
  123. // 删除本地所有已上传的bag文件
  124. util.RemoveDir(dir)
  125. log.GlobalLogger.Info("处理窗口:", currentTimeWindow.FaultTime, " - 结束")
  126. }
  127. // 每一秒扫一次
  128. time.Sleep(time.Duration(1) * time.Second)
  129. }
  130. }