rosbag_upload.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjibot_guide/common/config"
  4. masterConfig "cicv-data-closedloop/aarch64/pjibot_guide/master/package/config"
  5. masterService "cicv-data-closedloop/aarch64/pjibot_guide/master/package/service"
  6. "cicv-data-closedloop/common/config/c_log"
  7. "cicv-data-closedloop/common/domain"
  8. "cicv-data-closedloop/common/entity"
  9. "cicv-data-closedloop/common/util"
  10. commonUtil "cicv-data-closedloop/common/util"
  11. "encoding/json"
  12. "fmt"
  13. "os"
  14. "path/filepath"
  15. "strings"
  16. "time"
  17. )
  18. func RunTimeWindowConsumerQueue(nodeName string) {
  19. c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
  20. outLoop:
  21. for {
  22. // 收到自杀信号
  23. select {
  24. case signal := <-ChannelKillConsume:
  25. if signal == 1 {
  26. ChannelKillConsume <- 1
  27. if len(entity.TimeWindowConsumerQueue) == 0 {
  28. AddKillTimes("5")
  29. return
  30. }
  31. } else { //signal == 2
  32. AddKillTimes("5")
  33. return
  34. }
  35. default:
  36. }
  37. // 每一秒扫一次
  38. time.Sleep(time.Duration(1) * time.Second)
  39. waitLength := len(entity.TimeWindowConsumerQueue)
  40. if waitLength == 0 {
  41. continue outLoop
  42. }
  43. // 1 获取即将处理的窗口
  44. currentTimeWindow := entity.TimeWindowConsumerQueue[0]
  45. entity.RemoveHeadOfTimeWindowConsumerQueue()
  46. c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
  47. // 2 获取目录
  48. dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime)
  49. bags, _ := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
  50. bagNumber := len(bags)
  51. if bagNumber > currentTimeWindow.Length {
  52. bagNumber = currentTimeWindow.Length
  53. bags = bags[0:currentTimeWindow.Length]
  54. }
  55. // 3 如果不是全量采集,则使用 filter 命令对 bag 包进行主题过滤。
  56. if commonConfig.CloudConfig.FullCollect == false {
  57. var filterTopics []string
  58. if nodeName == commonConfig.CloudConfig.Hosts[0].Name {
  59. filterTopics = currentTimeWindow.MasterTopics
  60. } else {
  61. filterTopics = currentTimeWindow.SlaveTopics
  62. }
  63. var topicsFilterSlice []string
  64. for _, topic := range filterTopics {
  65. topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'")
  66. }
  67. for i, bag := range bags {
  68. oldName := bag
  69. newName := bag + "_filter"
  70. filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
  71. _, output, err := util.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, filterCommand...)
  72. c_log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
  73. if err != nil {
  74. c_log.GlobalLogger.Errorf("filter命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
  75. continue
  76. }
  77. // 删除旧文件
  78. util.DeleteFile(oldName)
  79. // 将新文件改回旧文件名
  80. if err = os.Rename(newName, oldName); err != nil {
  81. c_log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
  82. continue outLoop
  83. }
  84. }
  85. }
  86. // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
  87. // 5 todo 机器人去掉压缩过程,防止cpu跑满
  88. //c_log.GlobalLogger.Info("压缩 bag 数据包,故障时间为:", currentTimeWindow.FaultTime)
  89. //for i, bag := range bags {
  90. // oldName := bag
  91. // compressCommand := []string{"compress", "--bz2", oldName}
  92. // c_log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
  93. // if _, output, err := util.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, compressCommand...); err != nil {
  94. // c_log.GlobalLogger.Errorf("compress命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
  95. // continue
  96. // }
  97. //}
  98. // 5 upload,必须顺序执行
  99. c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
  100. start := time.Now()
  101. objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
  102. objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
  103. objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
  104. for i, bag := range bags {
  105. startOne := time.Now()
  106. bagSlice := strings.Split(bag, "/")
  107. for {
  108. commonConfig.OssMutex.Lock()
  109. err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
  110. commonConfig.OssMutex.Unlock()
  111. if err != nil {
  112. c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err)
  113. continue
  114. }
  115. c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
  116. break
  117. }
  118. }
  119. c_log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
  120. // 在上传完成的包目录同级下添加一个目录同名的json
  121. triggerIds := make([]string, 0)
  122. for _, label := range currentTimeWindow.Labels {
  123. if value, ok := masterConfig.LabelMapTriggerId.Load(label); ok {
  124. triggerIds = append(triggerIds, value.(string))
  125. }
  126. }
  127. callBackMap := map[string]interface{}{
  128. "dataName": currentTimeWindow.FaultTime, // 云端callback程序会将该值加8小时,因为UTC和CSV时区相差8小时
  129. "dataSize": "", // 由合并程序补充
  130. "equipmentNo": commonConfig.LocalConfig.EquipmentNo,
  131. "secretKey": commonConfig.LocalConfig.SecretKey,
  132. "rosBagPath": objectKey2,
  133. "filePath": objectKey3,
  134. "taskId": commonConfig.PlatformConfig.TaskConfigId,
  135. "triggerId": triggerIds,
  136. }
  137. callBackJson, err := util.MapToJsonString(callBackMap)
  138. if err != nil {
  139. c_log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
  140. }
  141. commonConfig.OssMutex.Lock()
  142. // 上传callback.json
  143. err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
  144. if err != nil {
  145. c_log.GlobalLogger.Error("上传 callback.json 文件失败:", err)
  146. }
  147. // 额外采集mapBuf
  148. for _, file := range commonConfig.CloudConfig.MapBufFiles {
  149. err = commonConfig.OssBucket.PutObjectFromFile(objectKey3+filepath.Base(file), file)
  150. if err != nil {
  151. c_log.GlobalLogger.Error("上传 mapBuf 文件失败:", err)
  152. }
  153. }
  154. // 压缩采集data目录
  155. {
  156. // 1 如果 data.zip 已存在,先删除
  157. util.DeleteFileIfExists(commonConfig.CloudConfig.DataDir.Dest)
  158. c_log.GlobalLogger.Infof("旧的data目录压缩包【%v】已删除。", commonConfig.CloudConfig.DataDir.Dest)
  159. // 2 重新压缩升成 data.zip
  160. err = util.ZipDir(commonConfig.CloudConfig.DataDir.Src, commonConfig.CloudConfig.DataDir.Dest, commonConfig.CloudConfig.DataDir.SrcSub)
  161. if err != nil {
  162. c_log.GlobalLogger.Error("压缩data目录失败:", err)
  163. } else {
  164. c_log.GlobalLogger.Infof("压缩data目录【%v】->【%v】成功", commonConfig.CloudConfig.DataDir.Src, commonConfig.CloudConfig.DataDir.Dest)
  165. dataZipKey := objectKey3 + "data.zip"
  166. err = commonConfig.OssBucket.PutObjectFromFile(dataZipKey, commonConfig.CloudConfig.DataDir.Dest)
  167. if err != nil {
  168. c_log.GlobalLogger.Error("上传data目录压缩文件失败:", err)
  169. } else {
  170. c_log.GlobalLogger.Infof("上传data目录压缩包【%v】->【%v】成功", commonConfig.CloudConfig.DataDir.Dest, dataZipKey)
  171. }
  172. }
  173. commonConfig.OssMutex.Unlock()
  174. }
  175. // todo 不压缩采集data目录
  176. {
  177. //var filePaths []string // 初始化一个切片来保存文件路径
  178. //err = filepath.WalkDir(commonConfig.CloudConfig.DataDir.Src, func(path string, d fs.DirEntry, err error) error { // 使用filepath.WalkDir遍历目录
  179. // if err != nil {
  180. // return err // 如果有错误,返回错误
  181. // }
  182. //
  183. // // 检查是否为文件(跳过目录)
  184. // if !d.IsDir() {
  185. // filePaths = append(filePaths, path) // 将文件路径添加到切片中
  186. // }
  187. // return nil
  188. //})
  189. //if err != nil {
  190. // c_log.GlobalLogger.Error("扫描 data 目录失败:", err)
  191. // goto outLoop
  192. //}
  193. //
  194. //// 不压缩上传所有文件
  195. //for _, path := range filePaths {
  196. // if strings.Contains(path, commonConfig.CloudConfig.DataDir.Exclude) {
  197. // continue
  198. // }
  199. // relativePath := strings.Replace(path, commonConfig.CloudConfig.DataDir.Src, "", 1)
  200. // ossKey := objectKey3 + "data/" + relativePath
  201. // err = commonConfig.OssBucket.PutObjectFromFile(ossKey, path)
  202. // if err != nil {
  203. // c_log.GlobalLogger.Errorf("上传 data 目录内文件【%v】->【%v】失败:%v", path, ossKey, err)
  204. // goto outLoop
  205. // }
  206. //}
  207. //commonConfig.OssMutex.Unlock()
  208. }
  209. // 数据库中采集数量加一
  210. collectNumPlus()
  211. // 删除本地所有已上传的bag文件
  212. c_log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
  213. c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
  214. if err = util.RemoveDir(dir); err != nil {
  215. goto outLoop
  216. }
  217. if len(entity.TimeWindowConsumerQueue) == 0 {
  218. c_log.GlobalLogger.Infof("已处理所有窗口,重启 record 命令。")
  219. ChannelKillRosRecord <- 2
  220. masterService.ProcessingFlag = false
  221. }
  222. }
  223. }
  224. func collectNumPlus() {
  225. responseString, err := commonUtil.HttpPostJsonWithHeaders(
  226. commonConfig.CloudConfig.CollectNumPlus.Url,
  227. map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
  228. map[string]string{
  229. "snCode": commonConfig.LocalConfig.SecretKey,
  230. },
  231. )
  232. if err != nil {
  233. c_log.GlobalLogger.Error("发送http请求修改采集数量失败:", err)
  234. }
  235. // 解析JSON字符串到Response结构体
  236. var resp entity.Response
  237. err = json.Unmarshal([]byte(responseString), &resp)
  238. if err != nil {
  239. c_log.GlobalLogger.Error("解析修改采集数量结果失败:", err)
  240. }
  241. }