123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- package svc
- import (
- commonConfig "cicv-data-closedloop/kinglong/common/cfg"
- "cicv-data-closedloop/kinglong/common/global"
- "cicv-data-closedloop/kinglong/common/log"
- "cicv-data-closedloop/kinglong/common/util"
- masterCfg "cicv-data-closedloop/kinglong/master/pkg/cfg"
- "fmt"
- "os"
- "strings"
- "time"
- )
- func RunTimeWindowConsumerQueue(nodeName string) {
- log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
- outLoop:
- for { // 串行处理
- // 收到自杀信号
- select {
- case signal := <-ChannelKillConsume:
- if signal == 1 {
- ChannelKillConsume <- 1
- if len(global.TimeWindowConsumerQueue) == 0 {
- AddKillTimes("5")
- return
- }
- } else { //signal == 2
- AddKillTimes("5")
- return
- }
- default:
- }
- waitLength := len(global.TimeWindowConsumerQueue)
- log.GlobalLogger.Infof("待处理窗口个数为:%v", len(global.TimeWindowConsumerQueue))
- if waitLength > 0 {
- // 1 获取即将处理的窗口
- currentTimeWindow := global.TimeWindowConsumerQueue[0]
- util.RemoveHeaOfdTimeWindowConsumerQueue()
- log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
- // 2 获取目录
- dir := util.GetCopyDir(currentTimeWindow.FaultTime)
- bags := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
- bagNumber := len(bags)
- if bagNumber > currentTimeWindow.Length {
- bagNumber = currentTimeWindow.Length
- bags = bags[0 : currentTimeWindow.Length-1]
- }
- // 3 filter包,必须顺序执行
- var filterTopics []string
- if nodeName == commonConfig.CloudConfig.Hosts[0].Name {
- filterTopics = currentTimeWindow.MasterTopics
- } else {
- filterTopics = currentTimeWindow.SlaveTopics
- }
- var topicsFilterSlice []string
- for _, topic := range filterTopics {
- topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'")
- }
- for _, bag := range bags {
- oldName := bag
- newName := bag + "_filter"
- filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
- log.GlobalLogger.Info("执行bag包过滤命令:", filterCommand)
- _, output, err := util.Execute("rosbag", filterCommand...)
- if err != nil {
- log.GlobalLogger.Errorf("filter命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
- continue
- }
- log.GlobalLogger.Info("filter bag包:", oldName)
- // 删除旧文件
- util.DeleteFile(oldName)
- // 将新文件改回旧文件名
- if err = os.Rename(newName, oldName); err != nil {
- log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
- continue outLoop
- }
- }
- // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
- log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
- for _, bag := range bags {
- oldName := bag
- compressCommand := []string{"compress", "--bz2", oldName}
- if _, output, err := util.Execute("rosbag", compressCommand...); err != nil {
- log.GlobalLogger.Errorf("compress命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
- continue
- }
- }
- // 5 upload,必须顺序执行
- log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
- start := time.Now()
- objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
- objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
- objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
- for i, bag := range bags {
- bagSlice := strings.Split(bag, "/")
- log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】->【", objectKey1+bagSlice[len(bagSlice)-1], "】")
- err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
- if err != nil {
- log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
- continue
- }
- }
- log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
- // 在上传完成的包目录同级下添加一个目录同名的json
- triggerIds := make([]string, 0)
- for _, label := range currentTimeWindow.Labels {
- triggerIdToAppend := masterCfg.LabelMapTriggerId[label]
- log.GlobalLogger.Info("添加一个【triggerId】=", triggerIdToAppend)
- triggerIds = append(triggerIds, triggerIdToAppend)
- }
- callBackMap := map[string]interface{}{
- "dataName": currentTimeWindow.FaultTime,
- "dataSize": "", // 由合并程序补充
- "equipmentNo": commonConfig.LocalConfig.EquipmentNo,
- "secretKey": commonConfig.LocalConfig.SecretKey,
- "rosBagPath": objectKey2,
- "filePath": objectKey3,
- "taskId": commonConfig.PlatformConfig.TaskConfigId,
- "triggerId": triggerIds,
- }
- callBackJson, err := util.MapToJsonString(callBackMap)
- if err != nil {
- log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
- }
- err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
- if err != nil {
- log.GlobalLogger.Error("上传callback.json", callBackJson, "失败:", err)
- }
- // 删除本地所有已上传的bag文件
- log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
- if err = util.RemoveDir(dir); err != nil {
- continue outLoop
- }
- }
- // 每一秒扫一次
- time.Sleep(time.Duration(1) * time.Second)
- }
- }
|