123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- package svc
- import (
- commonConfig "cicv-data-closedloop/kinglong/common/cfg"
- "cicv-data-closedloop/kinglong/common/global"
- "cicv-data-closedloop/kinglong/common/log"
- commonService "cicv-data-closedloop/kinglong/common/svc"
- "cicv-data-closedloop/kinglong/common/util"
- "time"
- )
- // RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
- func RunTimeWindowProducerQueue() {
- log.GlobalLogger.Info("------- 生产者队列 - 启动 -------")
- for { // 必须串行排队处理
- select {
- case signal := <-commonService.ChannelKillMove:
- if signal == 1 {
- commonService.ChannelKillMove <- 1
- if len(global.TimeWindowProducerQueue) == 0 {
- commonService.AddKillTimes("4")
- return
- }
- } else { //signal == 2
- commonService.AddKillTimes("4")
- return
- }
- default:
- }
- time.Sleep(time.Duration(1) * time.Second)
- if len(global.TimeWindowProducerQueue) > 0 {
- currentTimeWindow := global.TimeWindowProducerQueue[0]
- // 将时间窗口移出准备队列
- util.RemoveHeadOfdTimeWindowProducerQueue()
- if currentTimeWindow.CanUpload == "yes" {
- log.GlobalLogger.Info("接收到的可上传的timeWindow")
- }
- if currentTimeWindow.CanUpload == "no" {
- log.GlobalLogger.Info("接收到的不可上传的timeWindow")
- }
- // 2 timeWindow不可以上传,则将data目录下的数据move到copy目录
- bags := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
- for _, bag := range bags {
- bagTime := util.GetBagTime(bag)
- compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
- compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
- if compare1 && compare2 {
- // 将bag包移动到Copy目录
- util.MoveFromDataToCopy(currentTimeWindow.FaultTime, bag)
- } else {
- if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
- // 必须已经生成了窗口之后的包才算窗口结束了
- break
- }
- }
- }
- // 判断是否可上传
- if currentTimeWindow.CanUpload == "yes" {
- // 1 timeWindow可以上传
- log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow)
- // 补充bag包
- util.SupplyCopyBags(currentTimeWindow)
- // 将时间窗口加入运行队列
- util.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
- }
- }
- }
- }
|