package service import ( commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config" commonService "cicv-data-closedloop/aarch64/pjisuv/common/service" "cicv-data-closedloop/common/config/c_log" "cicv-data-closedloop/common/domain" "cicv-data-closedloop/common/entity" "cicv-data-closedloop/common/util" "time" ) // 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传 func RunTimeWindowProducerQueue() { c_log.GlobalLogger.Info("生产者队列 - 启动") for { // 必须串行排队处理 select { case signal := <-commonService.ChannelKillMove: c_log.GlobalLogger.Info("生产者队列接收到自杀信号", signal) if signal == 1 { commonService.ChannelKillMove <- 1 if len(entity.TimeWindowProducerQueue) == 0 { commonService.AddKillTimes("4") return } } else { //signal == 2 commonService.AddKillTimes("4") return } default: } time.Sleep(time.Duration(1) * time.Second) if len(entity.TimeWindowProducerQueue) > 0 { currentTimeWindow := entity.TimeWindowProducerQueue[0] // 将时间窗口移出准备队列 entity.RemoveHeadOfTimeWindowProducerQueue() if currentTimeWindow.CanUpload == "yes" { c_log.GlobalLogger.Info("从节点接收到可上传的timeWindow") } if currentTimeWindow.CanUpload == "no" { c_log.GlobalLogger.Info("从节点接收到不可上传的timeWindow") } // 2 timeWindow不可以上传,则将data目录下的数据move到copy目录 bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag") for _, bag := range bags { bagTime := util.GetBagTime(bag) compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd) if compare1 && compare2 { // 将bag包移动到Copy目录 domain.MoveFromDataToCopy(currentTimeWindow.FaultTime, commonConfig.CloudConfig.BagDataDir, bag, commonConfig.CloudConfig.BagCopyDir) } else { if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) { // 必须已经生成了窗口之后的包才算窗口结束了 break } } } // 判断是否可上传 if currentTimeWindow.CanUpload == "yes" { // 1 timeWindow可以上传 c_log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow) // 补充bag包 //domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow) // 将时间窗口加入运行队列 entity.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow) } } } }