|
@@ -1,23 +1,24 @@
|
|
package svc
|
|
package svc
|
|
|
|
|
|
import (
|
|
import (
|
|
- commonConfig "cicv-data-closedloop/kinglong/common/cfg"
|
|
|
|
- "cicv-data-closedloop/kinglong/common/global"
|
|
|
|
- "cicv-data-closedloop/kinglong/common/log"
|
|
|
|
- commonService "cicv-data-closedloop/kinglong/common/svc"
|
|
|
|
- "cicv-data-closedloop/kinglong/common/util"
|
|
|
|
|
|
+ commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
|
|
|
|
+ commonService "cicv-data-closedloop/aarch64/pjisuv/common/service"
|
|
|
|
+ "cicv-data-closedloop/common/config/c_log"
|
|
|
|
+ "cicv-data-closedloop/common/domain"
|
|
|
|
+ "cicv-data-closedloop/common/entity"
|
|
|
|
+ "cicv-data-closedloop/common/util"
|
|
"time"
|
|
"time"
|
|
)
|
|
)
|
|
|
|
|
|
// RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
|
|
// RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
|
|
func RunTimeWindowProducerQueue() {
|
|
func RunTimeWindowProducerQueue() {
|
|
- log.GlobalLogger.Info("生产者队列 - 启动")
|
|
|
|
|
|
+ c_log.GlobalLogger.Info("生产者队列 - 启动")
|
|
for { // 必须串行排队处理
|
|
for { // 必须串行排队处理
|
|
select {
|
|
select {
|
|
case signal := <-commonService.ChannelKillMove:
|
|
case signal := <-commonService.ChannelKillMove:
|
|
if signal == 1 {
|
|
if signal == 1 {
|
|
commonService.ChannelKillMove <- 1
|
|
commonService.ChannelKillMove <- 1
|
|
- if len(global.TimeWindowProducerQueue) == 0 {
|
|
|
|
|
|
+ if len(entity.TimeWindowProducerQueue) == 0 {
|
|
commonService.AddKillTimes("4")
|
|
commonService.AddKillTimes("4")
|
|
return
|
|
return
|
|
}
|
|
}
|
|
@@ -28,26 +29,26 @@ func RunTimeWindowProducerQueue() {
|
|
default:
|
|
default:
|
|
}
|
|
}
|
|
time.Sleep(time.Duration(1) * time.Second)
|
|
time.Sleep(time.Duration(1) * time.Second)
|
|
- if len(global.TimeWindowProducerQueue) > 0 {
|
|
|
|
- currentTimeWindow := global.TimeWindowProducerQueue[0]
|
|
|
|
|
|
+ if len(entity.TimeWindowProducerQueue) > 0 {
|
|
|
|
+ currentTimeWindow := entity.TimeWindowProducerQueue[0]
|
|
// 将时间窗口移出准备队列
|
|
// 将时间窗口移出准备队列
|
|
- util.RemoveHeadOfdTimeWindowProducerQueue()
|
|
|
|
|
|
+ entity.RemoveHeadOfdTimeWindowProducerQueue()
|
|
if currentTimeWindow.CanUpload == "yes" {
|
|
if currentTimeWindow.CanUpload == "yes" {
|
|
- log.GlobalLogger.Info("从节点接收到可上传的timeWindow")
|
|
|
|
|
|
+ c_log.GlobalLogger.Info("从节点接收到可上传的timeWindow")
|
|
}
|
|
}
|
|
if currentTimeWindow.CanUpload == "no" {
|
|
if currentTimeWindow.CanUpload == "no" {
|
|
- log.GlobalLogger.Info("从节点接收到不可上传的timeWindow")
|
|
|
|
|
|
+ c_log.GlobalLogger.Info("从节点接收到不可上传的timeWindow")
|
|
}
|
|
}
|
|
|
|
|
|
// 2 timeWindow不可以上传,则将data目录下的数据move到copy目录
|
|
// 2 timeWindow不可以上传,则将data目录下的数据move到copy目录
|
|
- bags := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
|
|
|
|
|
|
+ bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
|
|
for _, bag := range bags {
|
|
for _, bag := range bags {
|
|
bagTime := util.GetBagTime(bag)
|
|
bagTime := util.GetBagTime(bag)
|
|
compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
|
|
compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
|
|
compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
|
|
compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
|
|
if compare1 && compare2 {
|
|
if compare1 && compare2 {
|
|
// 将bag包移动到Copy目录
|
|
// 将bag包移动到Copy目录
|
|
- util.MoveFromDataToCopy(currentTimeWindow.FaultTime, bag)
|
|
|
|
|
|
+ domain.MoveFromDataToCopy(currentTimeWindow.FaultTime, commonConfig.CloudConfig.BagDataDir, bag, commonConfig.CloudConfig.BagCopyDir)
|
|
} else {
|
|
} else {
|
|
if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
|
|
if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
|
|
// 必须已经生成了窗口之后的包才算窗口结束了
|
|
// 必须已经生成了窗口之后的包才算窗口结束了
|
|
@@ -58,11 +59,11 @@ func RunTimeWindowProducerQueue() {
|
|
// 判断是否可上传
|
|
// 判断是否可上传
|
|
if currentTimeWindow.CanUpload == "yes" {
|
|
if currentTimeWindow.CanUpload == "yes" {
|
|
// 1 timeWindow可以上传
|
|
// 1 timeWindow可以上传
|
|
- log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow)
|
|
|
|
|
|
+ c_log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow)
|
|
// 补充bag包
|
|
// 补充bag包
|
|
- util.SupplyCopyBags(currentTimeWindow)
|
|
|
|
|
|
+ domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
|
|
// 将时间窗口加入运行队列
|
|
// 将时间窗口加入运行队列
|
|
- util.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
|
|
|
|
|
|
+ entity.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|