package service import ( commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config" masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/config" "cicv-data-closedloop/common/config/c_log" "cicv-data-closedloop/common/domain" "cicv-data-closedloop/common/entity" commonUtil "cicv-data-closedloop/common/util" "encoding/json" "fmt" "strings" "time" ) // RunTimeWindowConsumerQueue 依次上传时间窗口 func RunTimeWindowConsumerQueue() { //nodeName:= commonConfig.LocalConfig.Node.Name c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动") outLoop: for { // 串行处理 // 收到自杀信号 select { case signal := <-ChannelKillConsume: c_log.GlobalLogger.Info("消费者队列接收到自杀信号:", signal) if signal == 1 { ChannelKillConsume <- 1 if len(entity.TimeWindowConsumerQueue) == 0 { AddKillTimes("5") return } } else { //signal == 2 AddKillTimes("5") return } default: } time.Sleep(time.Duration(1) * time.Second) waitLength := len(entity.TimeWindowConsumerQueue) if waitLength == 0 { continue outLoop } // 1 获取即将处理的窗口 currentTimeWindow := entity.TimeWindowConsumerQueue[0] entity.RemoveHeadOfTimeWindowConsumerQueue() c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length) // 2 获取目录和bags dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime) //bags, _ := commonUtil.ListAbsolutePathWithSuffixAndSort(dir, ".bag") // 3 如果不是全量采集,则使用 filter 命令对 bag 包进行主题过滤。 // todo 需要采集回传的话题是根据触发器设置的,触发器设置了之后进行过滤 //if commonConfig.CloudConfig.FullCollect == false { // filterTopics := commonConfig.CollectTopics // if nodeName == commonConfig.CloudConfig.Hosts[0].Name { // filterTopics = currentTimeWindow.MasterTopics // } else { // filterTopics = currentTimeWindow.SlaveTopics // } // var topicsFilterSlice []string // for _, topic := range filterTopics { // topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'") // } // for i, bag := range bags { // oldName := bag // newName := bag + "_filter" // filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""} // _, output, err := commonUtil.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, filterCommand...) // c_log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。") // if err != nil { // c_log.GlobalLogger.Errorf("filter 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err) // continue // } // // 删除旧文件 // _ = commonUtil.DeleteFile(oldName) // // 将新文件改回旧文件名 // if err = os.Rename(newName, oldName); err != nil { // c_log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err) // continue outLoop // } // } //} // //if commonConfig.CloudConfig.CompressBag { // // 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag // c_log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime) // for i, bag := range bags { // oldName := bag // compressCommand := []string{"compress", "--bz2", oldName} // c_log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。") // if _, output, err := commonUtil.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, compressCommand...); err != nil { // c_log.GlobalLogger.Errorf("compress 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err) // continue // } // } //} // 5 upload,必须顺序执行 //c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime) //start := time.Now() //objectKey1 := commonConfig.OssEquBasePrefix + "data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/" objectKey2 := commonConfig.OssEquBasePrefix + "data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", currentTimeWindow.Length) + ".bag" objectKey3 := commonConfig.OssEquBasePrefix + "data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", currentTimeWindow.Length) + "/" //for i, bag := range bags { // startOne := time.Now() // bagSlice := strings.Split(bag, "/") // for { // { // commonConfig.OssMutex.Lock() // err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag) // commonConfig.OssMutex.Unlock() // if err != nil { // c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err) // continue // } // } // c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】") // break // } //} //c_log.GlobalLogger.Info("上传完成,总耗时:", time.Since(start)) if commonConfig.LocalConfig.Node.Name == "node1" { // 在上传完成的包目录同级下添加一个目录同名的json var triggerIds []string for _, label := range currentTimeWindow.Labels { if triggerId, ok := masterConfig.LabelMapTriggerId.Load(label); !ok { c_log.GlobalLogger.Errorf("【label】=%v 没有对应的【triggerId】", label) } else { //c_log.GlobalLogger.Info("添加一个【triggerId】=", triggerId) triggerIds = append(triggerIds, commonUtil.ToString(triggerId)) } } //c_log.GlobalLogger.Info("json 中添加【triggerIds】=", triggerIds) callBackJsonBytes, _ := json.MarshalIndent(&entity.CallBack{ DataName: currentTimeWindow.FaultTime, DataSize: "", // 由合并程序补充 EquipmentNo: commonConfig.LocalConfig.EquipmentNo, SecretKey: commonConfig.LocalConfig.SecretKey, RosBagPath: objectKey2, FilePath: objectKey3, TaskId: commonConfig.PlatformConfig.TaskConfigId, TriggerId: triggerIds, }, "", " ") callBackJson := string(callBackJsonBytes) c_log.GlobalLogger.Info("【callBackJson】=", callBackJson) { //commonConfig.OssMutex.Lock() //err := commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson)) //commonConfig.OssMutex.Unlock() //if err != nil { // c_log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err) //} // 删除本地所有已上传的bag文件 //if err := commonUtil.RemoveDir(dir); err != nil { // continue outLoop //} commonUtil.WriteStringToFile(callBackJson, dir+"callback.json") } } c_log.GlobalLogger.Infof("结束处理窗口,【Label】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length) c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue)) } }