|
@@ -11,11 +11,11 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
-// RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
|
|
|
+// RunTimeWindowProducerQueue 依次处理时间窗口,从data目录移动到copy目录
|
|
|
func RunTimeWindowProducerQueue() {
|
|
|
c_log.GlobalLogger.Info("生产者队列goroutine - 启动")
|
|
|
for {
|
|
|
- // 收到自杀信号
|
|
|
+ // 1 监控自杀信号
|
|
|
select {
|
|
|
case signal := <-commonService.ChannelKillMove:
|
|
|
if signal == 1 {
|
|
@@ -30,58 +30,51 @@ func RunTimeWindowProducerQueue() {
|
|
|
}
|
|
|
default:
|
|
|
}
|
|
|
-
|
|
|
time.Sleep(time.Duration(1) * time.Second)
|
|
|
- if len(entity.TimeWindowProducerQueue) > 0 {
|
|
|
- bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
|
|
|
- currentTimeWindow := entity.TimeWindowProducerQueue[0]
|
|
|
- move := false
|
|
|
- bigger := false
|
|
|
- for _, bag := range bags {
|
|
|
- bagTime := util.GetBagTime(bag)
|
|
|
- // 2 如果bag不小于timeWindowBegin不大于timeWindowEnd,则移动
|
|
|
- compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
|
|
|
- compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
|
|
|
- if compare1 && compare2 {
|
|
|
- // 将bag包移动到Copy目录
|
|
|
- domain.MoveFromDataToCopy(currentTimeWindow.FaultTime, commonConfig.CloudConfig.BagDataDir, bag, commonConfig.CloudConfig.BagCopyDir)
|
|
|
- move = true
|
|
|
- } else {
|
|
|
- if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
|
|
|
- // 必须已经生成了窗口之后的包才算窗口结束了
|
|
|
- bigger = true
|
|
|
- break
|
|
|
- }
|
|
|
+ // 2 处理
|
|
|
+ if len(entity.TimeWindowProducerQueue) == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ currentTimeWindow := entity.TimeWindowProducerQueue[0] // 当前窗口
|
|
|
+ bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag") // data目录的bag包列表
|
|
|
+ move := false
|
|
|
+ bigger := false
|
|
|
+ for _, bag := range bags {
|
|
|
+ bagTime := util.GetBagTime(bag) // bag包对应的时间
|
|
|
+ // 3 如果bag时间在窗口时间区间内,则从data移动到copy
|
|
|
+ if util.BagTimeInInterval(bagTime, currentTimeWindow.TimeWindowBegin, currentTimeWindow.TimeWindowEnd) {
|
|
|
+ domain.MoveFromDataToCopy(currentTimeWindow.FaultTime, commonConfig.CloudConfig.BagDataDir, bag, commonConfig.CloudConfig.BagCopyDir)
|
|
|
+ move = true
|
|
|
+ } else {
|
|
|
+ // 4 如果已经生成了新的包,则当前窗口已经移动完成了
|
|
|
+ if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
|
|
|
+ bigger = true
|
|
|
+ break
|
|
|
}
|
|
|
}
|
|
|
- // 如果没有包可以供当前窗口移动,且已经生成了更新的包,则当前窗口已经可以上传
|
|
|
- if !move && bigger {
|
|
|
- // 1 如果第一个已经大于了timeWindowEnd,则触发上传并删除
|
|
|
- // 将时间窗口发送给从节点
|
|
|
- currentTimeWindow.CanUpload = "yes"
|
|
|
- c_log.GlobalLogger.Info("将已完成的窗口发送给从节点:", currentTimeWindow.CanUpload)
|
|
|
- domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
|
|
|
+ }
|
|
|
+ // 5 如果没有包可以供当前窗口移动,且已经生成了更新的包,则当前窗口已经可以上传
|
|
|
+ if !move && bigger {
|
|
|
+ currentTimeWindow.CanUpload = "yes"
|
|
|
+ // 如果第一个已经大于了timeWindowEnd,则触发上传并删除
|
|
|
+ domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
|
|
|
+ entity.RefreshTcpSendTime()
|
|
|
+ go sendTimeWindowByTcp(currentTimeWindow)
|
|
|
+ entity.RemoveHeadOfdTimeWindowProducerQueue()
|
|
|
+ entity.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
|
|
|
+ // 获取copy目录下的字典json,key为触发时间,value为label
|
|
|
+ timeToLabelJson, _ := util.ReadFile(commonConfig.CloudConfig.TimeToLabelJsonPath)
|
|
|
+ timeToLabelMap, _ := util.JsonStringToMap(timeToLabelJson)
|
|
|
+ timeToLabelMap[currentTimeWindow.FaultTime] = util.ToString(currentTimeWindow.Labels)
|
|
|
+ timeToLabelJson, _ = util.MapToJsonString(timeToLabelMap)
|
|
|
+ _ = util.WriteFile(timeToLabelJson, commonConfig.CloudConfig.TimeToLabelJsonPath)
|
|
|
+ continue
|
|
|
+ } else { // 保证当前窗口只发送一次,每间隔5秒发一次非yes窗口
|
|
|
+ if int(time.Since(entity.TcpSendTime).Seconds()) > commonConfig.CloudConfig.TimeWindowSendGap {
|
|
|
+ currentTimeWindow.CanUpload = "no"
|
|
|
+ c_log.GlobalLogger.Info("每隔", commonConfig.CloudConfig.TimeWindowSendGap, "秒发送一次tcp消息")
|
|
|
entity.RefreshTcpSendTime()
|
|
|
go sendTimeWindowByTcp(currentTimeWindow)
|
|
|
- // 将时间窗口移出准备队列
|
|
|
- entity.RemoveHeadOfdTimeWindowProducerQueue()
|
|
|
- // 将时间窗口加入运行队列
|
|
|
- entity.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
|
|
|
- // 获取copy目录下的字典json,key为触发时间,value为label
|
|
|
- timeToLabelJson, _ := util.ReadFile(commonConfig.CloudConfig.TimeToLabelJsonPath)
|
|
|
- timeToLabelMap, _ := util.JsonStringToMap(timeToLabelJson)
|
|
|
- timeToLabelMap[currentTimeWindow.FaultTime] = util.ToString(currentTimeWindow.Labels)
|
|
|
- timeToLabelJson, _ = util.MapToJsonString(timeToLabelMap)
|
|
|
- _ = util.WriteFile(timeToLabelJson, commonConfig.CloudConfig.TimeToLabelJsonPath)
|
|
|
- continue
|
|
|
- } else { // 保证当前窗口只发送一次,每间隔5秒发一次
|
|
|
- if int(time.Since(entity.TcpSendTime).Seconds()) > commonConfig.CloudConfig.TimeWindowSendGap {
|
|
|
- c_log.GlobalLogger.Info("每隔", commonConfig.CloudConfig.TimeWindowSendGap, "秒发送一次tcp消息")
|
|
|
- entity.RefreshTcpSendTime()
|
|
|
- // 2 如果第一个不大于timeWindowEnd,则发送不可上传的窗口信息。
|
|
|
- currentTimeWindow.CanUpload = "no"
|
|
|
- go sendTimeWindowByTcp(currentTimeWindow)
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
}
|