move_bag.go 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/kinglong/common/config"
  4. commonService "cicv-data-closedloop/aarch64/kinglong/common/service"
  5. "cicv-data-closedloop/common/config/c_log"
  6. "cicv-data-closedloop/common/domain"
  7. "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "time"
  10. )
  11. // RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
  12. func RunTimeWindowProducerQueue() {
  13. c_log.GlobalLogger.Info("生产者队列 - 启动")
  14. for { // 必须串行排队处理
  15. select {
  16. case signal := <-commonService.ChannelKillMove:
  17. if signal == 1 {
  18. commonService.ChannelKillMove <- 1
  19. if len(entity.TimeWindowProducerQueue) == 0 {
  20. commonService.AddKillTimes("4")
  21. return
  22. }
  23. } else { //signal == 2
  24. commonService.AddKillTimes("4")
  25. return
  26. }
  27. default:
  28. }
  29. time.Sleep(time.Duration(1) * time.Second)
  30. if len(entity.TimeWindowProducerQueue) > 0 {
  31. currentTimeWindow := entity.TimeWindowProducerQueue[0]
  32. // 将时间窗口移出准备队列
  33. entity.RemoveHeadOfTimeWindowProducerQueue()
  34. if currentTimeWindow.CanUpload == "yes" {
  35. c_log.GlobalLogger.Info("从节点接收到可上传的timeWindow")
  36. }
  37. if currentTimeWindow.CanUpload == "no" {
  38. c_log.GlobalLogger.Info("从节点接收到不可上传的timeWindow")
  39. }
  40. // 2 timeWindow不可以上传,则将data目录下的数据move到copy目录
  41. bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
  42. for _, bag := range bags {
  43. bagTime := util.GetBagTime(bag)
  44. compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
  45. compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
  46. if compare1 && compare2 {
  47. // 将bag包移动到Copy目录
  48. domain.MoveFromDataToCopy(currentTimeWindow.FaultTime, commonConfig.CloudConfig.BagDataDir, bag, commonConfig.CloudConfig.BagCopyDir)
  49. } else {
  50. if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
  51. // 必须已经生成了窗口之后的包才算窗口结束了
  52. break
  53. }
  54. }
  55. }
  56. // 判断是否可上传
  57. if currentTimeWindow.CanUpload == "yes" {
  58. // 1 timeWindow可以上传
  59. c_log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow)
  60. // 补充bag包
  61. domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
  62. // 将时间窗口加入运行队列
  63. entity.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
  64. }
  65. }
  66. }
  67. }