12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- package svc
- import (
- commonCfg "cicv-data-closedloop/kinglong/common/cfg"
- "cicv-data-closedloop/kinglong/common/global"
- "cicv-data-closedloop/kinglong/common/log"
- "cicv-data-closedloop/kinglong/common/util"
- "time"
- )
- // RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
- func RunTimeWindowProducerQueue() {
- log.GlobalLogger.Info("------- 生产者队列 - 启动 -------")
- for { // 必须串行排队处理
- //TODO 测试更新任务时放开
- //select {
- //case signal := <-commonSvc.ChannelKillTcp:
- // if signal == 1 {
- // defer commonSvc.AddKillTimes()
- // slaveCfg.TcpListener.Close()
- // return
- // }
- //default: // 添加default语句块防止
- //}
- time.Sleep(time.Duration(1) * time.Second)
- if len(global.TimeWindowProducerQueue) > 0 {
- currentTimeWindow := global.TimeWindowProducerQueue[0]
- // 将时间窗口移出准备队列
- util.RemoveHeadOfdTimeWindowProducerQueue()
- if currentTimeWindow.CanUpload == "yes" {
- log.GlobalLogger.Info("接收到的可上传的timeWindow")
- }
- if currentTimeWindow.CanUpload == "no" {
- log.GlobalLogger.Info("接收到的不可上传的timeWindow")
- }
- // 2 timeWindow不可以上传,则将data目录下的数据move到copy目录
- bags := util.ListAbsolutePathWithSuffixAndSort(commonCfg.CloudConfig.BagDataDir, ".bag")
- for _, bag := range bags {
- bagTime := util.GetBagTime(bag)
- compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
- compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
- if compare1 && compare2 {
- // 将bag包移动到Copy目录
- util.MoveFromDataToCopy(currentTimeWindow.FaultTime, bag)
- } else {
- if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
- // 必须已经生成了窗口之后的包才算窗口结束了
- break
- }
- }
- }
- // 判断是否可上传
- if currentTimeWindow.CanUpload == "yes" {
- // 1 timeWindow可以上传
- log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow)
- // 补充bag包
- util.SupplyCopyBags(currentTimeWindow)
- // 将时间窗口加入运行队列
- util.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
- }
- }
- }
- }
|