1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- package service
- import (
- commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
- commonService "cicv-data-closedloop/aarch64/pjisuv/common/service"
- "cicv-data-closedloop/common/config/c_log"
- "cicv-data-closedloop/common/domain"
- "cicv-data-closedloop/common/entity"
- "cicv-data-closedloop/common/util"
- "time"
- )
- // RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
- func RunTimeWindowProducerQueue() {
- c_log.GlobalLogger.Info("生产者队列 - 启动")
- for { // 必须串行排队处理
- select {
- case signal := <-commonService.ChannelKillMove:
- c_log.GlobalLogger.Info("生产者队列接收到自杀信号", signal)
- if signal == 1 {
- commonService.ChannelKillMove <- 1
- if len(entity.TimeWindowProducerQueue) == 0 {
- commonService.AddKillTimes("4")
- return
- }
- } else { //signal == 2
- commonService.AddKillTimes("4")
- return
- }
- default:
- }
- time.Sleep(time.Duration(1) * time.Second)
- if len(entity.TimeWindowProducerQueue) > 0 {
- currentTimeWindow := entity.TimeWindowProducerQueue[0]
- // 将时间窗口移出准备队列
- entity.RemoveHeadOfTimeWindowProducerQueue()
- if currentTimeWindow.CanUpload == "yes" {
- c_log.GlobalLogger.Info("从节点接收到可上传的timeWindow")
- }
- if currentTimeWindow.CanUpload == "no" {
- c_log.GlobalLogger.Info("从节点接收到不可上传的timeWindow")
- }
- // 2 timeWindow不可以上传,则将data目录下的数据move到copy目录
- bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
- for _, bag := range bags {
- bagTime := util.GetBagTime(bag)
- compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
- compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
- if compare1 && compare2 {
- // 将bag包移动到Copy目录
- domain.MoveFromDataToCopy(currentTimeWindow.FaultTime, commonConfig.CloudConfig.BagDataDir, bag, commonConfig.CloudConfig.BagCopyDir)
- } else {
- if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
- // 必须已经生成了窗口之后的包才算窗口结束了
- break
- }
- }
- }
- // 判断是否可上传
- if currentTimeWindow.CanUpload == "yes" {
- // 1 timeWindow可以上传
- c_log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow)
- // 补充bag包
- //domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
- // 将时间窗口加入运行队列
- entity.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
- }
- }
- }
- }
|