package service

import (
	"cicv-data-closedloop/amd64/dispatch_server/package/domain"
	"cicv-data-closedloop/amd64/dispatch_server/package/entity"
	"cicv-data-closedloop/amd64/dispatch_server/package/global"
	"cicv-data-closedloop/amd64/dispatch_server/package/infra"
	"cicv-data-closedloop/amd64/dispatch_server/package/util"
	"encoding/json"
	"errors"
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
	"path/filepath"
	"strconv"
	"strings"
	"time"
)

/*
 负责处理用户等待队列中的任务
 负责运行集群等待队列中的任务
*/

// 判断用户等待队列中的任务是否可以加入到集群等待队列
func RunWaitingUser() {
	infra.GlobalLogger.Infof("启动【用户等待队列】监控进程。")
	for {
		time.Sleep(2 * time.Second)
		global.RunTaskMutex.Lock()
		// 获取Redis列表中的值
		taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
		if err != nil {
			infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
			continue
		}

		for _, taskCacheJson := range taskCacheJsons {
			taskCache, err := JsonToTaskCache(taskCacheJson)
			if err != nil {
				infra.GlobalLogger.Error(err)
				continue
			}
			userId := taskCache.UserId
			userParallelism := taskCache.UserParallelism
			algorithmObjectKey := taskCache.AlgorithmObjectKey
			task := taskCache.Task
			// 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
			if domain.CanRunUser(userId, userParallelism) { // 可以运行
				err = domain.AddWaitingCluster(userId, userParallelism, algorithmObjectKey, task)
				if err != nil {
					infra.GlobalLogger.Error(err)
					continue
				}
				err = domain.DeleteWaitingUser(task.Info.TaskId)
				if err != nil {
					infra.GlobalLogger.Error(err)
					continue
				}
			}
		}
		global.RunTaskMutex.Unlock()
	}
}

// 集群等待队列中的任务判断是否可以加入集群运行队列
func RunWaitingCluster() {
	infra.GlobalLogger.Infof("启动【集群等待队列】监控进程。")
	for {
		var algorithmTarName string
		var algorithmTarPath string
		var algorithmImageName string

		time.Sleep(2 * time.Second)
		global.GpuNodeListMutex.Lock()
		// 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
		can, gpuNode, err := domain.CanRunCluster()
		if err != nil {
			infra.GlobalLogger.Error(err)
			global.GpuNodeListMutex.Unlock()
			continue
		}
		var firstTaskCache entity.TaskCache
		if can {
			//infra.GlobalLogger.Infof("节点 %v 有剩余并行度。", gpuNode)
			// 判断是否有待运行的任务
			waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result()
			if waitingClusterNumber == 0 {
				//infra.GlobalLogger.Info("集群没有等待运行的任务。")
				global.GpuNodeListMutex.Unlock()
				continue
			} else {
				infra.GlobalLogger.Infof("集群存在 %v 个等待运行的任务。", waitingClusterNumber)
			}

			// 取出但不移除
			{
				firstTaskCacheJson, err := infra.GlobalRedisClient.LIndex(global.KeyTaskQueueWaitingCluster, 0).Result()
				if err != nil {
					infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
					global.GpuNodeListMutex.Unlock()
					continue
				}
				firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
				if err != nil {
					infra.GlobalLogger.Error(err)
					global.GpuNodeListMutex.Unlock()
					continue
				}
			}
			// --------------- 下载算法 ---------------
			{
				infra.GlobalLogger.Infof("开始下载算法 %v。", firstTaskCache.AlgorithmObjectKey)
				algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey)
				algorithmTarPath = infra.ApplicationYaml.K8s.AlgorithmTarTempDir + util.NewShortUUID() + "/" + algorithmTarName
				_ = util.CreateParentDir(algorithmTarPath)
				algorithmImageName = infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
				_ = infra.GlobalOssBucket.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
				if err != nil {
					infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
					time.Sleep(time.Duration(2) * time.Second)
					global.GpuNodeListMutex.Unlock()
					continue
				}
				infra.GlobalLogger.Infof("下载算法 %v 成功。", firstTaskCache.AlgorithmObjectKey)
			}
		} else {
			infra.GlobalLogger.Infof("集群没有剩余并行度。")
			global.GpuNodeListMutex.Unlock()
			continue
		}
		global.GpuNodeListMutex.Unlock()
		// 获取项目ID
		projectId := firstTaskCache.Task.Info.ProjectId
		offsetKey := "offset:" + projectId
		offset := 0

		// 根据项目ID获取偏移量
		val, err := infra.GlobalRedisClient.Get(offsetKey).Result()
		if err != nil {
			infra.GlobalLogger.Infof("偏移量键 %v 不存在,初始化设置为 0。", offsetKey)
			err = infra.GlobalRedisClient.Set(offsetKey, 0, 0).Err()
			if err != nil {
				infra.GlobalLogger.Infof("偏移量键值对 %v 初始化失败,错误信息为: %v", offsetKey, err)
				continue
			}
		} else {
			offset, err = strconv.Atoi(val)
			if err != nil {
				infra.GlobalLogger.Infof("字符串 %v 转整数失败,错误信息为: %v", val, err)
				continue
			}
			infra.GlobalLogger.Infof("当前任务使用偏移量【%v】", offset)
		}
		// 取出偏移量后将缓存中的加一,给下个任务使用。
		_, err = infra.GlobalRedisClient.Incr(offsetKey).Result()
		if err != nil {
			infra.GlobalLogger.Infof("偏移量 %v 加一失败,错误信息为: %v", offsetKey, err)
			continue
		}
		infra.GlobalLogger.Infof("偏移量【%v】加一给下个任务使用。", offsetKey)

		// --------------- 发送 kafka 消息(获取偏移量和分区) ---------------
		// 获取任务消息转json
		// todo 将摄像头写死一个参数
		firstTaskCache.Task.Vehicle.Sensors.Camera = []entity.SensorCamera{
			{
				Sensor: entity.Sensor{
					SensorName: "custom_camera",
					SensorNear: 1.0,
					SensorFar:  1500.0,
					SensorX:    -15000.0,
					SensorY:    0.0,
					SensorZ:    5000.0,
					SensorH:    0.0,
					SensorP:    10.0,
					SensorR:    0.0,
				},
				SensorForH:       45,
				SensorForV:       27,
				SensorResolution: "480*270",
				SensorFrameRate:  25,
			},
		}
		taskJson, err := TaskToJson(firstTaskCache.Task)
		if err != nil {
			infra.GlobalLogger.Error(err)
			continue
		}
		topic := projectId
		// 创建一个Message,并指定分区为0
		msg := &kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: infra.ApplicationYaml.Kafka.Partition, Offset: kafka.Offset(offset)},
			Value:          []byte(taskJson),
		}
		// 发送消息,并处理结果
		err = infra.GlobalKafkaProducer.Produce(msg, nil)
		if err != nil {
			infra.GlobalLogger.Infof("发送任务消息 %v 失败,错误信息为: %v", msg, err)
			continue
		}
		infra.GlobalLogger.Infof("发送任务消息成功,话题为【%v】,偏移量为【%v】。", topic, offset)

		// 导入算法
		_, s, err := util.Execute("docker", "import", algorithmTarPath, algorithmImageName)
		_, s, err = util.Execute("docker", "push", algorithmImageName)
		if err != nil {
			infra.GlobalLogger.Errorf("导入算法镜像 %v 为 %v 失败,执行结果为:%v,错误信息为:%v", algorithmTarPath, algorithmImageName, s, err)
			time.Sleep(time.Duration(2) * time.Second)
			continue
		}
		infra.GlobalLogger.Infof("导入算法镜像 %v 为 %v 成功,执行结果为:%v", algorithmTarPath, algorithmImageName, s)
		err = util.RemoveFile(algorithmTarPath)
		if err != nil {
			infra.GlobalLogger.Errorf("删除算法镜像文件 %v 失败,错误信息为:%v", algorithmTarPath, err)
		}
		// --------------- 启动 k8s pod ---------------
		podName := "project-" + projectId + "-" + util.NewShortUUID()
		namespaceName := infra.ApplicationYaml.K8s.NamespaceName
		nodeName := gpuNode.Hostname
		restParallelism := gpuNode.Parallelism
		vtdContainer := "vtd-" + projectId
		algorithmContainer := "algorithm-" + projectId
		vtdImage := infra.ApplicationYaml.K8s.VtdImage
		// 2 生成模板文件名称
		podYaml := nodeName + "#" + podName + ".yaml"
		// 3 模板yaml存储路径
		yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml
		// 4 模板yaml备份路径
		yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml
		fmt.Println(yamlPath, yamlPathBak)
		// 5
		podString, err := util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYaml)
		if err != nil {
			infra.GlobalLogger.Error(err)
			continue
		}
		podString = strings.Replace(podString, "pod-name", podName, -1)
		podString = strings.Replace(podString, "namespace-name", namespaceName, -1)
		podString = strings.Replace(podString, "node-name", nodeName, -1)
		podString = strings.Replace(podString, "algorithm-image", algorithmImageName, -1)
		podString = strings.Replace(podString, "vtd-container", vtdContainer, -1)
		podString = strings.Replace(podString, "vtd-image", vtdImage, -1)
		podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommand, -1)
		podString = strings.Replace(podString, "platform-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1)
		podString = strings.Replace(podString, "simulation-cloud-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1)
		podString = strings.Replace(podString, "platform-type", "\""+infra.ApplicationYaml.K8s.PlatformType+"\"", -1)
		podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.Oss.Type, -1)
		podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.Oss.Endpoint, -1) // 不带http://前缀
		podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.Oss.AccessKeyId, -1)
		podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.Oss.AccessKeySecret, -1)
		podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.Oss.BucketName, -1)
		podString = strings.Replace(podString, "kafka-ip", infra.ApplicationYaml.Kafka.Broker, -1)
		podString = strings.Replace(podString, "kafka-topic", projectId, -1)
		podString = strings.Replace(podString, "kafka-partition", "\""+util.ToString(infra.ApplicationYaml.Kafka.Partition)+"\"", -1)
		podString = strings.Replace(podString, "kafka-offset", "\""+util.ToString(offset)+"\"", -1)
		podString = strings.Replace(podString, "cpu-order", "\""+util.ToString(restParallelism-1)+"\"", -1) // cpu编号是剩余并行度-1
		podString = strings.Replace(podString, "algorithm-container", algorithmContainer, -1)

		// --------------- 保存成文件
		err = util.WriteFile(podString, yamlPath)
		err = util.WriteFile(podString, yamlPathBak)
		if err != nil {
			infra.GlobalLogger.Error("保存yaml字符串失败,错误信息为", err)
			continue
		}
		infra.GlobalLogger.Infof("保存yaml文件到执行路径【%v】和备份路径【%v】", yamlPath, yamlPathBak)
		// --------------- 启动 pod
		_, s2, err := util.Execute("kubectl", "apply", "-f", yamlPath)
		if err != nil {
			infra.GlobalLogger.Errorf("启动pod失败,执行结果为 %v,错误信息为 %v", s2, err)
			continue
		}
		infra.GlobalLogger.Errorf("启动pod成功,执行结果为 %v。", s2)
		// 收尾
		{
			// --------------- 添加到运行队列
			err = domain.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
			if err != nil {
				infra.GlobalLogger.Error(err)
				global.GpuNodeListMutex.Unlock()
				continue
			}
			// --------------- 从等待队列中移除
			_, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
			if err != nil {
				infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
				continue
			}
			// --------------- 删除镜像文件
			_ = util.RemoveFile(algorithmTarPath)
		}

	}
}

func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
	// 创建一个 Person 类型的变量
	var taskCache entity.TaskCache

	// 使用 json.Unmarshal 解析 JSON 字符串到结构体
	err := json.Unmarshal([]byte(jsonData), &taskCache)
	if err != nil {
		return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
	}
	return taskCache, nil
}

func TaskToJson(task entity.Task) (string, error) {
	jsonData, err := json.Marshal(task)
	if err != nil {
		return "", errors.New("转json失败,错误信息为:" + err.Error())
	}
	return string(jsonData), nil
}