package service

import (
	commonConfig "cicv-data-closedloop/aarch64/pjibot_guide/common/config"
	masterConfig "cicv-data-closedloop/aarch64/pjibot_guide/master/package/config"
	"cicv-data-closedloop/common/config/c_log"
	"cicv-data-closedloop/common/domain"
	"cicv-data-closedloop/common/entity"
	"cicv-data-closedloop/common/util"
	commonUtil "cicv-data-closedloop/common/util"
	"encoding/json"
	"fmt"
	"os"
	"path/filepath"
	"strings"
	"time"
)

func RunTimeWindowConsumerQueue(nodeName string) {
	c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
outLoop:
	for {

		// 收到自杀信号
		select {
		case signal := <-ChannelKillConsume:
			if signal == 1 {
				ChannelKillConsume <- 1
				if len(entity.TimeWindowConsumerQueue) == 0 {
					AddKillTimes("5")
					return
				}
			} else { //signal == 2
				AddKillTimes("5")
				return
			}
		default:
		}
		// 每一秒扫一次
		time.Sleep(time.Duration(1) * time.Second)

		waitLength := len(entity.TimeWindowConsumerQueue)
		if waitLength == 0 {
			continue outLoop
		}

		// 1 获取即将处理的窗口
		currentTimeWindow := entity.TimeWindowConsumerQueue[0]
		entity.RemoveHeadOfTimeWindowConsumerQueue()
		c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
		// 2 获取目录
		dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime)
		bags, _ := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
		bagNumber := len(bags)
		if bagNumber > currentTimeWindow.Length {
			bagNumber = currentTimeWindow.Length
			bags = bags[0:currentTimeWindow.Length]
		}

		// 3 如果不是全量采集,则使用 filter 命令对 bag 包进行主题过滤。
		if commonConfig.CloudConfig.FullCollect == false {
			var filterTopics []string
			if nodeName == commonConfig.CloudConfig.Hosts[0].Name {
				filterTopics = currentTimeWindow.MasterTopics
			} else {
				filterTopics = currentTimeWindow.SlaveTopics
			}
			var topicsFilterSlice []string
			for _, topic := range filterTopics {
				topicsFilterSlice = append(topicsFilterSlice, "topic=='"+topic+"'")
			}
			for i, bag := range bags {
				oldName := bag
				newName := bag + "_filter"
				filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
				_, output, err := util.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, filterCommand...)
				c_log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
				if err != nil {
					c_log.GlobalLogger.Errorf("filter命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
					continue
				}
				// 删除旧文件
				util.DeleteFile(oldName)
				// 将新文件改回旧文件名
				if err = os.Rename(newName, oldName); err != nil {
					c_log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
					continue outLoop
				}
			}
		}

		// 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
		// 5 todo 机器人去掉压缩过程,防止cpu跑满
		//c_log.GlobalLogger.Info("压缩 bag 数据包,故障时间为:", currentTimeWindow.FaultTime)
		//for i, bag := range bags {
		//	oldName := bag
		//	compressCommand := []string{"compress", "--bz2", oldName}
		//	c_log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
		//	if _, output, err := util.ExecuteWithEnvSync(commonConfig.RosbagEnvs, commonConfig.RosbagPath, compressCommand...); err != nil {
		//		c_log.GlobalLogger.Errorf("compress命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
		//		continue
		//	}
		//}
		// 5 upload,必须顺序执行
		c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
		start := time.Now()
		objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
		objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
		objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
		for i, bag := range bags {
			startOne := time.Now()
			bagSlice := strings.Split(bag, "/")
			for {
				commonConfig.OssMutex.Lock()
				err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
				commonConfig.OssMutex.Unlock()
				if err != nil {
					c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err)
					continue
				}
				c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
				break
			}
		}
		c_log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
		// 在上传完成的包目录同级下添加一个目录同名的json
		triggerIds := make([]string, 0)
		for _, label := range currentTimeWindow.Labels {
			if value, ok := masterConfig.LabelMapTriggerId.Load(label); ok {
				triggerIds = append(triggerIds, value.(string))
			}
		}
		callBackMap := map[string]interface{}{
			"dataName":    currentTimeWindow.FaultTime, // 云端callback程序会将该值加8小时,因为UTC和CSV时区相差8小时
			"dataSize":    "",                          // 由合并程序补充
			"equipmentNo": commonConfig.LocalConfig.EquipmentNo,
			"secretKey":   commonConfig.LocalConfig.SecretKey,
			"rosBagPath":  objectKey2,
			"filePath":    objectKey3,
			"taskId":      commonConfig.PlatformConfig.TaskConfigId,
			"triggerId":   triggerIds,
		}
		callBackJson, err := util.MapToJsonString(callBackMap)
		if err != nil {
			c_log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
		}
		commonConfig.OssMutex.Lock()
		// 上传callback.json
		err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
		if err != nil {
			c_log.GlobalLogger.Error("上传 callback.json 文件失败:", err)
		}
		// 额外采集mapBuf
		for _, file := range commonConfig.CloudConfig.MapBufFiles {
			err = commonConfig.OssBucket.PutObjectFromFile(objectKey3+filepath.Base(file), file)
			if err != nil {
				c_log.GlobalLogger.Error("上传 mapBuf 文件失败:", err)
			}
		}

		// 压缩采集data目录
		{
			// 1 如果 data.zip 已存在,先删除
			util.DeleteFileIfExists(commonConfig.CloudConfig.DataDir.Dest)
			c_log.GlobalLogger.Infof("旧的data目录压缩包【%v】已删除。", commonConfig.CloudConfig.DataDir.Dest)
			// 2 重新压缩升成 data.zip
			err = util.ZipDir(commonConfig.CloudConfig.DataDir.Src, commonConfig.CloudConfig.DataDir.Dest, commonConfig.CloudConfig.DataDir.SrcSub)
			if err != nil {
				c_log.GlobalLogger.Error("压缩data目录失败:", err)
			} else {
				c_log.GlobalLogger.Infof("压缩data目录【%v】->【%v】成功", commonConfig.CloudConfig.DataDir.Src, commonConfig.CloudConfig.DataDir.Dest)
				dataZipKey := objectKey3 + "data.zip"
				err = commonConfig.OssBucket.PutObjectFromFile(dataZipKey, commonConfig.CloudConfig.DataDir.Dest)
				if err != nil {
					c_log.GlobalLogger.Error("上传data目录压缩文件失败:", err)
				} else {
					c_log.GlobalLogger.Infof("上传data目录压缩包【%v】->【%v】成功", commonConfig.CloudConfig.DataDir.Dest, dataZipKey)
				}
			}
			commonConfig.OssMutex.Unlock()
		}
		// todo 不压缩采集data目录
		{
			//var filePaths []string                                                                                           // 初始化一个切片来保存文件路径
			//err = filepath.WalkDir(commonConfig.CloudConfig.DataDir.Src, func(path string, d fs.DirEntry, err error) error { // 使用filepath.WalkDir遍历目录
			//	if err != nil {
			//		return err // 如果有错误,返回错误
			//	}
			//
			//	// 检查是否为文件(跳过目录)
			//	if !d.IsDir() {
			//		filePaths = append(filePaths, path) // 将文件路径添加到切片中
			//	}
			//	return nil
			//})
			//if err != nil {
			//	c_log.GlobalLogger.Error("扫描 data 目录失败:", err)
			//	goto outLoop
			//}
			//
			//// 不压缩上传所有文件
			//for _, path := range filePaths {
			//	if strings.Contains(path, commonConfig.CloudConfig.DataDir.Exclude) {
			//		continue
			//	}
			//	relativePath := strings.Replace(path, commonConfig.CloudConfig.DataDir.Src, "", 1)
			//	ossKey := objectKey3 + "data/" + relativePath
			//	err = commonConfig.OssBucket.PutObjectFromFile(ossKey, path)
			//	if err != nil {
			//		c_log.GlobalLogger.Errorf("上传 data 目录内文件【%v】->【%v】失败:%v", path, ossKey, err)
			//		goto outLoop
			//	}
			//}
			//commonConfig.OssMutex.Unlock()
		}

		// 数据库中采集数量加一
		collectNumPlus()
		// 删除本地所有已上传的bag文件
		c_log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
		c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
		if err = util.RemoveDir(dir); err != nil {
			goto outLoop
		}
		if len(entity.TimeWindowConsumerQueue) == 0 {
			c_log.GlobalLogger.Infof("已处理所有窗口,重启 record 命令。")
			ChannelKillRosRecord <- 2
			entity.ProcessingFlag = false
		}
	}
}

func collectNumPlus() {
	responseString, err := commonUtil.HttpPostJsonWithHeaders(
		commonConfig.CloudConfig.CollectNumPlus.Url,
		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
		map[string]string{
			"snCode": commonConfig.LocalConfig.SecretKey,
		},
	)
	if err != nil {
		c_log.GlobalLogger.Error("发送http请求修改采集数量失败:", err)
	}
	// 解析JSON字符串到Response结构体
	var resp entity.Response
	err = json.Unmarshal([]byte(responseString), &resp)
	if err != nil {
		c_log.GlobalLogger.Error("解析修改采集数量结果失败:", err)
	}
}