move_bag.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/jili/common/config"
  4. commonService "cicv-data-closedloop/aarch64/jili/common/service"
  5. "cicv-data-closedloop/common/config/c_log"
  6. "cicv-data-closedloop/common/domain"
  7. "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "time"
  10. )
  11. // 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
  12. func RunTimeWindowProducerQueue() {
  13. c_log.GlobalLogger.Info("生产者队列 - 启动")
  14. for { // 必须串行排队处理
  15. select {
  16. case signal := <-commonService.ChannelKillMove:
  17. c_log.GlobalLogger.Info("生产者队列接收到自杀信号", signal)
  18. if signal == 1 {
  19. commonService.ChannelKillMove <- 1
  20. if len(entity.TimeWindowProducerQueue) == 0 {
  21. commonService.AddKillTimes("4")
  22. return
  23. }
  24. } else { //signal == 2
  25. commonService.AddKillTimes("4")
  26. return
  27. }
  28. default:
  29. }
  30. time.Sleep(time.Duration(1) * time.Second)
  31. if len(entity.TimeWindowProducerQueue) > 0 {
  32. currentTimeWindow := entity.TimeWindowProducerQueue[0]
  33. // 将时间窗口移出准备队列
  34. entity.RemoveHeadOfTimeWindowProducerQueue()
  35. if currentTimeWindow.CanUpload == "yes" {
  36. c_log.GlobalLogger.Info("从节点接收到可上传的timeWindow")
  37. }
  38. if currentTimeWindow.CanUpload == "no" {
  39. c_log.GlobalLogger.Info("从节点接收到不可上传的timeWindow")
  40. }
  41. // 2 timeWindow不可以上传,则将data目录下的数据move到copy目录
  42. bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
  43. for _, bag := range bags {
  44. bagTime := util.GetBagTime(bag)
  45. compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
  46. compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
  47. if compare1 && compare2 {
  48. // 将bag包移动到Copy目录
  49. domain.MoveFromDataToCopy(currentTimeWindow.FaultTime, commonConfig.CloudConfig.BagDataDir, bag, commonConfig.CloudConfig.BagCopyDir)
  50. } else {
  51. if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
  52. // 必须已经生成了窗口之后的包才算窗口结束了
  53. break
  54. }
  55. }
  56. }
  57. // 判断是否可上传
  58. if currentTimeWindow.CanUpload == "yes" {
  59. // 1 timeWindow可以上传
  60. c_log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow)
  61. // 补充bag包
  62. //domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
  63. // 将时间窗口加入运行队列
  64. entity.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
  65. }
  66. }
  67. }
  68. }