rosbag_upload.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjibot_delivery/common/config"
  4. masterConfig "cicv-data-closedloop/aarch64/pjibot_delivery/master/package/config"
  5. "cicv-data-closedloop/common/config/c_log"
  6. "cicv-data-closedloop/common/domain"
  7. "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. commonUtil "cicv-data-closedloop/common/util"
  10. "encoding/json"
  11. "fmt"
  12. "os"
  13. "strings"
  14. "time"
  15. )
  16. func RunTimeWindowConsumerQueue(nodeName string) {
  17. c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
  18. outLoop:
  19. for {
  20. // 收到自杀信号
  21. select {
  22. case signal := <-ChannelKillConsume:
  23. if signal == 1 {
  24. ChannelKillConsume <- 1
  25. if len(entity.TimeWindowConsumerQueue) == 0 {
  26. AddKillTimes("5")
  27. return
  28. }
  29. } else { //signal == 2
  30. AddKillTimes("5")
  31. return
  32. }
  33. default:
  34. }
  35. // 每一秒扫一次
  36. time.Sleep(time.Duration(1) * time.Second)
  37. waitLength := len(entity.TimeWindowConsumerQueue)
  38. if waitLength == 0 {
  39. continue outLoop
  40. }
  41. // 1 获取即将处理的窗口
  42. currentTimeWindow := entity.TimeWindowConsumerQueue[0]
  43. entity.RemoveHeadOfTimeWindowConsumerQueue()
  44. c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
  45. // 2 获取目录
  46. dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime)
  47. bags, _ := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
  48. bagNumber := len(bags)
  49. if bagNumber > currentTimeWindow.Length {
  50. bagNumber = currentTimeWindow.Length
  51. bags = bags[0:currentTimeWindow.Length]
  52. }
  53. // 3 如果不是全量采集,则使用 filter 命令对 bag 包进行主题过滤。
  54. if commonConfig.CloudConfig.FullCollect == false {
  55. var filterTopics []string
  56. if nodeName == commonConfig.CloudConfig.Hosts[0].Name {
  57. filterTopics = currentTimeWindow.MasterTopics
  58. } else {
  59. filterTopics = currentTimeWindow.SlaveTopics
  60. }
  61. var topicsFilterSlice []string
  62. for _, topic := range filterTopics {
  63. topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'")
  64. }
  65. for i, bag := range bags {
  66. oldName := bag
  67. newName := bag + "_filter"
  68. filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
  69. _, output, err := util.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, filterCommand...)
  70. c_log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
  71. if err != nil {
  72. c_log.GlobalLogger.Errorf("filter命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
  73. continue
  74. }
  75. // 删除旧文件
  76. util.DeleteFile(oldName)
  77. // 将新文件改回旧文件名
  78. if err = os.Rename(newName, oldName); err != nil {
  79. c_log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
  80. continue outLoop
  81. }
  82. }
  83. }
  84. // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
  85. // 5 todo 机器人去掉压缩过程,防止cpu跑满
  86. //c_log.GlobalLogger.Info("压缩 bag 数据包,故障时间为:", currentTimeWindow.FaultTime)
  87. //for i, bag := range bags {
  88. // oldName := bag
  89. // compressCommand := []string{"compress", "--bz2", oldName}
  90. // c_log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
  91. // if _, output, err := util.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, compressCommand...); err != nil {
  92. // c_log.GlobalLogger.Errorf("compress命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
  93. // continue
  94. // }
  95. //}
  96. // 5 upload,必须顺序执行
  97. c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
  98. start := time.Now()
  99. objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
  100. objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
  101. objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
  102. for i, bag := range bags {
  103. startOne := time.Now()
  104. bagSlice := strings.Split(bag, "/")
  105. for {
  106. commonConfig.OssMutex.Lock()
  107. err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
  108. commonConfig.OssMutex.Unlock()
  109. if err != nil {
  110. c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err)
  111. continue
  112. }
  113. c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
  114. break
  115. }
  116. }
  117. c_log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
  118. // 在上传完成的包目录同级下添加一个目录同名的json
  119. triggerIds := make([]string, 0)
  120. for _, label := range currentTimeWindow.Labels {
  121. if value, ok := masterConfig.LabelMapTriggerId.Load(label); ok {
  122. triggerIds = append(triggerIds, value.(string))
  123. }
  124. }
  125. callBackMap := map[string]interface{}{
  126. "dataName": currentTimeWindow.FaultTime, // 云端callback程序会将该值加8小时,因为UTC和CSV时区相差8小时
  127. "dataSize": "", // 由合并程序补充
  128. "equipmentNo": commonConfig.LocalConfig.EquipmentNo,
  129. "secretKey": commonConfig.LocalConfig.SecretKey,
  130. "rosBagPath": objectKey2,
  131. "filePath": objectKey3,
  132. "taskId": commonConfig.PlatformConfig.TaskConfigId,
  133. "triggerId": triggerIds,
  134. }
  135. callBackJson, err := util.MapToJsonString(callBackMap)
  136. if err != nil {
  137. c_log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
  138. }
  139. commonConfig.OssMutex.Lock()
  140. err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
  141. commonConfig.OssMutex.Unlock()
  142. if err != nil {
  143. c_log.GlobalLogger.Error("上传 callback.json 文件失败:", err)
  144. }
  145. // 数据库中采集数量加一
  146. collectNumPlus()
  147. // 删除本地所有已上传的bag文件
  148. c_log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
  149. c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
  150. if err = util.RemoveDir(dir); err != nil {
  151. continue outLoop
  152. }
  153. if len(entity.TimeWindowConsumerQueue) == 0 {
  154. c_log.GlobalLogger.Infof("已处理所有窗口,重启 record 命令。")
  155. ChannelKillRosRecord <- 2
  156. entity.ProcessingFlag = false
  157. }
  158. }
  159. }
  160. func collectNumPlus() {
  161. responseString, err := commonUtil.HttpPostJsonWithHeaders(
  162. commonConfig.CloudConfig.CollectNumPlus.Url,
  163. map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
  164. map[string]string{
  165. "snCode": commonConfig.LocalConfig.SecretKey,
  166. },
  167. )
  168. if err != nil {
  169. c_log.GlobalLogger.Error("发送http请求修改采集数量失败:", err)
  170. }
  171. // 解析JSON字符串到Response结构体
  172. var resp entity.Response
  173. err = json.Unmarshal([]byte(responseString), &resp)
  174. if err != nil {
  175. c_log.GlobalLogger.Error("解析修改采集数量结果失败:", err)
  176. }
  177. }