package svc import ( commonConfig "cicv-data-closedloop/kinglong/common/cfg" "cicv-data-closedloop/kinglong/common/global" "cicv-data-closedloop/kinglong/common/log" commonService "cicv-data-closedloop/kinglong/common/svc" "cicv-data-closedloop/kinglong/common/util" "time" ) // RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传 func RunTimeWindowProducerQueue() { log.GlobalLogger.Info("------- 生产者队列 - 启动 -------") for { // 必须串行排队处理 select { case signal := <-commonService.ChannelKillMove: if signal == 1 { commonService.ChannelKillMove <- 1 if len(global.TimeWindowProducerQueue) == 0 { commonService.AddKillTimes("4") return } } else { //signal == 2 commonService.AddKillTimes("4") return } default: } time.Sleep(time.Duration(1) * time.Second) if len(global.TimeWindowProducerQueue) > 0 { currentTimeWindow := global.TimeWindowProducerQueue[0] // 将时间窗口移出准备队列 util.RemoveHeadOfdTimeWindowProducerQueue() if currentTimeWindow.CanUpload == "yes" { log.GlobalLogger.Info("接收到的可上传的timeWindow") } if currentTimeWindow.CanUpload == "no" { log.GlobalLogger.Info("接收到的不可上传的timeWindow") } // 2 timeWindow不可以上传,则将data目录下的数据move到copy目录 bags := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag") for _, bag := range bags { bagTime := util.GetBagTime(bag) compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd) if compare1 && compare2 { // 将bag包移动到Copy目录 util.MoveFromDataToCopy(currentTimeWindow.FaultTime, bag) } else { if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) { // 必须已经生成了窗口之后的包才算窗口结束了 break } } } // 判断是否可上传 if currentTimeWindow.CanUpload == "yes" { // 1 timeWindow可以上传 log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow) // 补充bag包 util.SupplyCopyBags(currentTimeWindow) // 将时间窗口加入运行队列 util.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow) } } } }