|
@@ -0,0 +1,250 @@
|
|
|
+package service
|
|
|
+
|
|
|
+import (
|
|
|
+ commonConfig "cicv-data-closedloop/aarch64/pjibot_clean/common/config"
|
|
|
+ masterConfig "cicv-data-closedloop/aarch64/pjibot_clean/master/package/config"
|
|
|
+ "cicv-data-closedloop/common/config/c_log"
|
|
|
+ "cicv-data-closedloop/common/domain"
|
|
|
+ "cicv-data-closedloop/common/entity"
|
|
|
+ "cicv-data-closedloop/common/util"
|
|
|
+ commonUtil "cicv-data-closedloop/common/util"
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
+ "os"
|
|
|
+ "path/filepath"
|
|
|
+ "strings"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+func RunTimeWindowConsumerQueue(nodeName string) {
|
|
|
+ c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
|
|
|
+outLoop:
|
|
|
+ for {
|
|
|
+
|
|
|
+
|
|
|
+ select {
|
|
|
+ case signal := <-ChannelKillConsume:
|
|
|
+ if signal == 1 {
|
|
|
+ ChannelKillConsume <- 1
|
|
|
+ if len(entity.TimeWindowConsumerQueue) == 0 {
|
|
|
+ AddKillTimes("5")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ AddKillTimes("5")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ }
|
|
|
+
|
|
|
+ time.Sleep(time.Duration(1) * time.Second)
|
|
|
+
|
|
|
+ waitLength := len(entity.TimeWindowConsumerQueue)
|
|
|
+ if waitLength == 0 {
|
|
|
+ continue outLoop
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ currentTimeWindow := entity.TimeWindowConsumerQueue[0]
|
|
|
+ entity.RemoveHeadOfTimeWindowConsumerQueue()
|
|
|
+ c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
|
|
|
+
|
|
|
+ dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime)
|
|
|
+ bags, _ := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
|
|
|
+ bagNumber := len(bags)
|
|
|
+ if bagNumber > currentTimeWindow.Length {
|
|
|
+ bagNumber = currentTimeWindow.Length
|
|
|
+ bags = bags[0:currentTimeWindow.Length]
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if commonConfig.CloudConfig.FullCollect == false {
|
|
|
+ var filterTopics []string
|
|
|
+ if nodeName == commonConfig.CloudConfig.Hosts[0].Name {
|
|
|
+ filterTopics = currentTimeWindow.MasterTopics
|
|
|
+ } else {
|
|
|
+ filterTopics = currentTimeWindow.SlaveTopics
|
|
|
+ }
|
|
|
+ var topicsFilterSlice []string
|
|
|
+ for _, topic := range filterTopics {
|
|
|
+ topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'")
|
|
|
+ }
|
|
|
+ for i, bag := range bags {
|
|
|
+ oldName := bag
|
|
|
+ newName := bag + "_filter"
|
|
|
+ filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
|
|
|
+ _, output, err := util.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, filterCommand...)
|
|
|
+ c_log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Errorf("filter命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ util.DeleteFile(oldName)
|
|
|
+
|
|
|
+ if err = os.Rename(newName, oldName); err != nil {
|
|
|
+ c_log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
|
|
|
+ continue outLoop
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
|
|
|
+ start := time.Now()
|
|
|
+ objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
+ objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
|
|
|
+ objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
|
|
|
+ for i, bag := range bags {
|
|
|
+ startOne := time.Now()
|
|
|
+ bagSlice := strings.Split(bag, "/")
|
|
|
+ for {
|
|
|
+ commonConfig.OssMutex.Lock()
|
|
|
+ err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
|
|
|
+ commonConfig.OssMutex.Unlock()
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ c_log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
|
|
|
+
|
|
|
+ triggerIds := make([]string, 0)
|
|
|
+ for _, label := range currentTimeWindow.Labels {
|
|
|
+ if value, ok := masterConfig.LabelMapTriggerId.Load(label); ok {
|
|
|
+ triggerIds = append(triggerIds, value.(string))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ callBackMap := map[string]interface{}{
|
|
|
+ "dataName": currentTimeWindow.FaultTime,
|
|
|
+ "dataSize": "",
|
|
|
+ "equipmentNo": commonConfig.LocalConfig.EquipmentNo,
|
|
|
+ "secretKey": commonConfig.LocalConfig.SecretKey,
|
|
|
+ "rosBagPath": objectKey2,
|
|
|
+ "filePath": objectKey3,
|
|
|
+ "taskId": commonConfig.PlatformConfig.TaskConfigId,
|
|
|
+ "triggerId": triggerIds,
|
|
|
+ }
|
|
|
+ callBackJson, err := util.MapToJsonString(callBackMap)
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
|
|
|
+ }
|
|
|
+ commonConfig.OssMutex.Lock()
|
|
|
+
|
|
|
+ err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Error("上传 callback.json 文件失败:", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, file := range commonConfig.CloudConfig.MapBufFiles {
|
|
|
+ err = commonConfig.OssBucket.PutObjectFromFile(objectKey3+filepath.Base(file), file)
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Error("上传 mapBuf 文件失败:", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ {
|
|
|
+
|
|
|
+ util.DeleteFileIfExists(commonConfig.CloudConfig.DataDir.Dest)
|
|
|
+ c_log.GlobalLogger.Infof("旧的data目录压缩包【%v】已删除。", commonConfig.CloudConfig.DataDir.Dest)
|
|
|
+
|
|
|
+ err = util.ZipDir(commonConfig.CloudConfig.DataDir.Src, commonConfig.CloudConfig.DataDir.Dest, commonConfig.CloudConfig.DataDir.SrcSub)
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Error("压缩data目录失败:", err)
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Infof("压缩data目录【%v】->【%v】成功", commonConfig.CloudConfig.DataDir.Src, commonConfig.CloudConfig.DataDir.Dest)
|
|
|
+ dataZipKey := objectKey3 + "data.zip"
|
|
|
+ err = commonConfig.OssBucket.PutObjectFromFile(dataZipKey, commonConfig.CloudConfig.DataDir.Dest)
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Error("上传data目录压缩文件失败:", err)
|
|
|
+ } else {
|
|
|
+ c_log.GlobalLogger.Infof("上传data目录压缩包【%v】->【%v】成功", commonConfig.CloudConfig.DataDir.Dest, dataZipKey)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ commonConfig.OssMutex.Unlock()
|
|
|
+ }
|
|
|
+
|
|
|
+ {
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ collectNumPlus()
|
|
|
+
|
|
|
+ c_log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
|
|
|
+ c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
|
|
|
+ if err = util.RemoveDir(dir); err != nil {
|
|
|
+ goto outLoop
|
|
|
+ }
|
|
|
+ if len(entity.TimeWindowConsumerQueue) == 0 {
|
|
|
+ c_log.GlobalLogger.Infof("已处理所有窗口,重启 record 命令。")
|
|
|
+ ChannelKillRosRecord <- 2
|
|
|
+ entity.ProcessingFlag = false
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func collectNumPlus() {
|
|
|
+ responseString, err := commonUtil.HttpPostJsonWithHeaders(
|
|
|
+ commonConfig.CloudConfig.CollectNumPlus.Url,
|
|
|
+ map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
|
|
|
+ map[string]string{
|
|
|
+ "snCode": commonConfig.LocalConfig.SecretKey,
|
|
|
+ },
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Error("发送http请求修改采集数量失败:", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ var resp entity.Response
|
|
|
+ err = json.Unmarshal([]byte(responseString), &resp)
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Error("解析修改采集数量结果失败:", err)
|
|
|
+ }
|
|
|
+}
|