|
@@ -1,7 +1,7 @@
|
|
|
package svc
|
|
|
|
|
|
import (
|
|
|
- cfg2 "cicv-data-closedloop/kinglong/common/cfg"
|
|
|
+ commonConfig "cicv-data-closedloop/kinglong/common/cfg"
|
|
|
"cicv-data-closedloop/kinglong/common/global"
|
|
|
"cicv-data-closedloop/kinglong/common/log"
|
|
|
"cicv-data-closedloop/kinglong/common/util"
|
|
@@ -17,16 +17,18 @@ func RunTimeWindowConsumerQueue(nodeName string) {
|
|
|
outLoop:
|
|
|
for { // 串行处理
|
|
|
// 收到自杀信号
|
|
|
- signal := <-ChannelKillConsume
|
|
|
- if signal == 1 {
|
|
|
- ChannelKillConsume <- 1
|
|
|
- if len(global.TimeWindowConsumerQueue) == 0 {
|
|
|
+ select {
|
|
|
+ case signal := <-ChannelKillConsume:
|
|
|
+ if signal == 1 {
|
|
|
+ ChannelKillConsume <- 1
|
|
|
+ if len(global.TimeWindowConsumerQueue) == 0 {
|
|
|
+ AddKillTimes("5")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ } else { //signal == 2
|
|
|
AddKillTimes("5")
|
|
|
return
|
|
|
}
|
|
|
- } else { //signal == 2
|
|
|
- AddKillTimes("5")
|
|
|
- return
|
|
|
}
|
|
|
if len(global.TimeWindowConsumerQueue) > 0 {
|
|
|
// 1 获取即将处理的窗口
|
|
@@ -47,7 +49,7 @@ outLoop:
|
|
|
|
|
|
// 3 filter包,必须顺序执行
|
|
|
var filterTopics []string
|
|
|
- if nodeName == cfg2.CloudConfig.Hosts[0].Name {
|
|
|
+ if nodeName == commonConfig.CloudConfig.Hosts[0].Name {
|
|
|
filterTopics = currentTimeWindow.MasterTopics
|
|
|
} else {
|
|
|
filterTopics = currentTimeWindow.SlaveTopics
|
|
@@ -86,13 +88,13 @@ outLoop:
|
|
|
// 5 upload,必须顺序执行
|
|
|
log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
|
start := time.Now()
|
|
|
- objectKey1 := cfg2.LocalConfig.OssBasePrefix + cfg2.LocalConfig.EquipmentNo + "/data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
- objectKey2 := cfg2.LocalConfig.OssBasePrefix + cfg2.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
|
|
|
- objectKey3 := cfg2.LocalConfig.OssBasePrefix + cfg2.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
+ objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
+ objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
|
|
|
+ objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
for i, bag := range bags {
|
|
|
bagSlice := strings.Split(bag, "/")
|
|
|
log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】->【", objectKey1+bagSlice[len(bagSlice)-1], "】")
|
|
|
- err := cfg2.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
|
|
|
+ err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
|
|
|
if err != nil {
|
|
|
log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
|
|
|
continue
|
|
@@ -109,18 +111,18 @@ outLoop:
|
|
|
callBackMap := map[string]interface{}{
|
|
|
"dataName": currentTimeWindow.FaultTime,
|
|
|
"dataSize": "", // 由合并程序补充
|
|
|
- "equipmentNo": cfg2.LocalConfig.EquipmentNo,
|
|
|
- "secretKey": cfg2.LocalConfig.SecretKey,
|
|
|
+ "equipmentNo": commonConfig.LocalConfig.EquipmentNo,
|
|
|
+ "secretKey": commonConfig.LocalConfig.SecretKey,
|
|
|
"rosBagPath": objectKey2,
|
|
|
"filePath": objectKey3,
|
|
|
- "taskId": cfg2.PlatformConfig.TaskConfigId,
|
|
|
+ "taskId": commonConfig.PlatformConfig.TaskConfigId,
|
|
|
"triggerId": triggerIds,
|
|
|
}
|
|
|
callBackJson, err := util.MapToJsonString(callBackMap)
|
|
|
if err != nil {
|
|
|
log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
|
|
|
}
|
|
|
- err = cfg2.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
|
|
|
+ err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
|
|
|
if err != nil {
|
|
|
log.GlobalLogger.Error("上传callback.json", callBackJson, "失败:", err)
|
|
|
}
|