package svc

import (
	commonConfig "cicv-data-closedloop/pji/common/cfg"
	"cicv-data-closedloop/pji/common/global"
	"cicv-data-closedloop/pji/common/log"
	"cicv-data-closedloop/pji/common/util"
	masterConfig "cicv-data-closedloop/pji/master/pkg/cfg"
	"fmt"
	"os"
	"strings"
	"time"
)

func RunTimeWindowConsumerQueue(nodeName string) {
	log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
outLoop:
	for {
		// 收到自杀信号
		signal := <-ChannelKillConsume
		if signal == 1 {
			ChannelKillConsume <- 1
			if len(global.TimeWindowConsumerQueue) == 0 {
				AddKillTimes("5")
				return
			}
		} else { //signal == 2
			AddKillTimes("5")
			return
		}

		if len(global.TimeWindowConsumerQueue) > 0 {
			// 1 获取即将处理的窗口
			currentTimeWindow := global.TimeWindowConsumerQueue[0]
			util.RemoveHeaOfdTimeWindowConsumerQueue()
			log.GlobalLogger.Info("即将消费窗口:", currentTimeWindow)
			// 2 获取目录
			dir := util.GetCopyDir(currentTimeWindow.FaultTime)
			bags := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
			bagNumber := len(bags)
			if bagNumber > currentTimeWindow.Length {
				bagNumber = currentTimeWindow.Length
				bags = bags[0 : currentTimeWindow.Length-1]
			}

			// 3 filter包,必须顺序执行
			var filterTopics []string
			if nodeName == commonConfig.CloudConfig.Hosts[0].Name {
				filterTopics = currentTimeWindow.MasterTopics
			} else {
				filterTopics = currentTimeWindow.SlaveTopics
			}
			var topicsFilterSlice []string
			for _, topic := range filterTopics {
				topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'")
			}
			for _, bag := range bags {
				oldName := bag
				newName := bag + "_filter"
				var command []string
				command = append(command, "filter")
				command = append(command, oldName)
				command = append(command, newName)
				command = append(command, "\""+strings.Join(topicsFilterSlice, " or ")+"\"")
				//log.GlobalLogger.Info("执行bag包过滤命令:", command)
				log.GlobalLogger.Info("filter bag包:", oldName)
				util.Execute("rosbag", command...)
				// 删除旧文件
				util.DeleteFile(oldName)
				// 将新文件改回旧文件名
				err := os.Rename(newName, oldName)
				if err != nil {
					log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
					continue outLoop
				}
			}

			// 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
			log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
			for _, bag := range bags {
				oldName := bag
				util.Execute("rosbag", "compress", "--bz2", oldName)
			}
			// 5 upload,必须顺序执行
			log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
			start := time.Now()
			objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
			objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
			objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"

			for i, bag := range bags {
				bagSlice := strings.Split(bag, "/")
				log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】->【", objectKey1+bagSlice[len(bagSlice)-1], "】")
				err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
				if err != nil {
					log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
					continue
				}
			}
			log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
			// 在上传完成的包目录同级下添加一个目录同名的json
			triggerIds := make([]string, 0)
			for _, label := range currentTimeWindow.Labels {
				triggerIds = append(triggerIds, masterConfig.LabelMapTriggerId[label])
			}
			callBackMap := map[string]interface{}{
				"dataName":    currentTimeWindow.FaultTime, // 云端callback程序会将该值加8小时,因为UTC和CSV时区相差8小时
				"dataSize":    "",                          // 由合并程序补充
				"equipmentNo": commonConfig.LocalConfig.EquipmentNo,
				"secretKey":   commonConfig.LocalConfig.SecretKey,
				"rosBagPath":  objectKey2,
				"filePath":    objectKey3,
				"taskId":      commonConfig.PlatformConfig.TaskConfigId,
				"triggerId":   triggerIds,
			}
			callBackJson, err := util.MapToJsonString(callBackMap)
			if err != nil {
				log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
			}
			err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
			if err != nil {
				log.GlobalLogger.Error("上传callback.json", callBackJson, "失败:", err)
			}

			// 删除本地所有已上传的bag文件
			util.RemoveDir(dir)
			log.GlobalLogger.Info(" -------- 处理窗口:", currentTimeWindow.FaultTime, " - 结束 -------")
		}
		// 每一秒扫一次
		time.Sleep(time.Duration(1) * time.Second)
	}
}