move_bag.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package svc
  2. import (
  3. commonConfig "cicv-data-closedloop/kinglong/common/cfg"
  4. "cicv-data-closedloop/kinglong/common/global"
  5. "cicv-data-closedloop/kinglong/common/log"
  6. commonService "cicv-data-closedloop/kinglong/common/svc"
  7. "cicv-data-closedloop/kinglong/common/util"
  8. "time"
  9. )
  10. // RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
  11. func RunTimeWindowProducerQueue() {
  12. log.GlobalLogger.Info("生产者队列 - 启动")
  13. for { // 必须串行排队处理
  14. select {
  15. case signal := <-commonService.ChannelKillMove:
  16. if signal == 1 {
  17. commonService.ChannelKillMove <- 1
  18. if len(global.TimeWindowProducerQueue) == 0 {
  19. commonService.AddKillTimes("4")
  20. return
  21. }
  22. } else { //signal == 2
  23. commonService.AddKillTimes("4")
  24. return
  25. }
  26. default:
  27. }
  28. time.Sleep(time.Duration(1) * time.Second)
  29. if len(global.TimeWindowProducerQueue) > 0 {
  30. currentTimeWindow := global.TimeWindowProducerQueue[0]
  31. // 将时间窗口移出准备队列
  32. util.RemoveHeadOfdTimeWindowProducerQueue()
  33. if currentTimeWindow.CanUpload == "yes" {
  34. log.GlobalLogger.Info("从节点接收到可上传的timeWindow")
  35. }
  36. if currentTimeWindow.CanUpload == "no" {
  37. log.GlobalLogger.Info("从节点接收到不可上传的timeWindow")
  38. }
  39. // 2 timeWindow不可以上传,则将data目录下的数据move到copy目录
  40. bags := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
  41. for _, bag := range bags {
  42. bagTime := util.GetBagTime(bag)
  43. compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
  44. compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
  45. if compare1 && compare2 {
  46. // 将bag包移动到Copy目录
  47. util.MoveFromDataToCopy(currentTimeWindow.FaultTime, bag)
  48. } else {
  49. if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
  50. // 必须已经生成了窗口之后的包才算窗口结束了
  51. break
  52. }
  53. }
  54. }
  55. // 判断是否可上传
  56. if currentTimeWindow.CanUpload == "yes" {
  57. // 1 timeWindow可以上传
  58. log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow)
  59. // 补充bag包
  60. util.SupplyCopyBags(currentTimeWindow)
  61. // 将时间窗口加入运行队列
  62. util.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
  63. }
  64. }
  65. }
  66. }