move_bag.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package svc
  2. import (
  3. commonCfg "cicv-data-closedloop/kinglong/common/cfg"
  4. "cicv-data-closedloop/kinglong/common/global"
  5. "cicv-data-closedloop/kinglong/common/log"
  6. "cicv-data-closedloop/kinglong/common/util"
  7. "time"
  8. )
  9. // RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
  10. func RunTimeWindowProducerQueue() {
  11. log.GlobalLogger.Info("------- 生产者队列 - 启动 -------")
  12. for { // 必须串行排队处理
  13. //TODO 测试更新任务时放开
  14. //select {
  15. //case signal := <-commonSvc.ChannelKillTcp:
  16. // if signal == 1 {
  17. // defer commonSvc.AddKillTimes()
  18. // slaveCfg.TcpListener.Close()
  19. // return
  20. // }
  21. //default: // 添加default语句块防止
  22. //}
  23. time.Sleep(time.Duration(1) * time.Second)
  24. if len(global.TimeWindowProducerQueue) > 0 {
  25. currentTimeWindow := global.TimeWindowProducerQueue[0]
  26. // 将时间窗口移出准备队列
  27. util.RemoveHeadOfdTimeWindowProducerQueue()
  28. if currentTimeWindow.CanUpload == "yes" {
  29. log.GlobalLogger.Info("接收到的可上传的timeWindow")
  30. }
  31. if currentTimeWindow.CanUpload == "no" {
  32. log.GlobalLogger.Info("接收到的不可上传的timeWindow")
  33. }
  34. // 2 timeWindow不可以上传,则将data目录下的数据move到copy目录
  35. bags := util.ListAbsolutePathWithSuffixAndSort(commonCfg.CloudConfig.BagDataDir, ".bag")
  36. for _, bag := range bags {
  37. bagTime := util.GetBagTime(bag)
  38. compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
  39. compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
  40. if compare1 && compare2 {
  41. // 将bag包移动到Copy目录
  42. util.MoveFromDataToCopy(currentTimeWindow.FaultTime, bag)
  43. } else {
  44. if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
  45. // 必须已经生成了窗口之后的包才算窗口结束了
  46. break
  47. }
  48. }
  49. }
  50. // 判断是否可上传
  51. if currentTimeWindow.CanUpload == "yes" {
  52. // 1 timeWindow可以上传
  53. log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow)
  54. // 补充bag包
  55. util.SupplyCopyBags(currentTimeWindow)
  56. // 将时间窗口加入运行队列
  57. util.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
  58. }
  59. }
  60. }
  61. }