package service import ( "cicv-data-closedloop/amd64/dispatch_server/package/domain" "cicv-data-closedloop/amd64/dispatch_server/package/entity" "cicv-data-closedloop/amd64/dispatch_server/package/global" "cicv-data-closedloop/amd64/dispatch_server/package/infra" "cicv-data-closedloop/amd64/dispatch_server/package/util" "encoding/json" "errors" "fmt" "strings" "time" ) /* 负责处理用户等待队列中的任务 负责运行集群等待队列中的任务 */ // 判断用户等待队列中的任务是否可以加入到集群等待队列 func RunWaitingUser() { for { time.Sleep(2 * time.Second) global.RunTaskMutex.Lock() // 获取Redis列表中的值 taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result() if err != nil { infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err) continue } for _, taskCacheJson := range taskCacheJsons { taskCache, err := JsonToTaskCache(taskCacheJson) if err != nil { infra.GlobalLogger.Error(err) continue } userId := taskCache.UserId userParallelism := taskCache.UserParallelism task := taskCache.Task // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动 if domain.CanRunUser(userId, userParallelism) { // 可以运行 err = domain.AddWaitingCluster(userId, userParallelism, task) if err != nil { infra.GlobalLogger.Error(err) continue } err = domain.DeleteWaitingUser(task.Info.TaskId) if err != nil { infra.GlobalLogger.Error(err) continue } } } global.RunTaskMutex.Unlock() } } // 集群等待队列中的任务判断是否可以加入集群运行队列 func RunWaitingCluster() { for { time.Sleep(2 * time.Second) global.GpuNodeListMutex.Lock() // 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动 can, gpuNode, err := domain.CanRunCluster() if err != nil { infra.GlobalLogger.Error(err) continue } var firstTaskCache entity.TaskCache if can { // 移除并取出 firstTaskCacheJson, err := infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result() if err != nil { infra.GlobalLogger.Error("移除并取出集群等待队列中的头元素报错,错误信息为:", err) continue } firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson) if err != nil { infra.GlobalLogger.Error(err) continue } err = domain.AddRunningCluster(firstTaskCache) if err != nil { infra.GlobalLogger.Error(err) continue } } global.GpuNodeListMutex.Unlock() // --------------- 启动 k8s pod --------------- projectId := firstTaskCache.Task.Info.ProjectId nodeName := gpuNode.Hostname // 1 生成 podName podName := "project-" + projectId + "-" + util.NewShortUUID() // 2 生成模板文件名称 podYaml := nodeName + "#" + podName + ".yaml" // 3 模板yaml存储路径 yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml // 4 模板yaml备份路径 yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml // 5 podString, err := util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYaml) if err != nil { infra.GlobalLogger.Error(err) continue } podString = strings.Replace(podString, "vtd-container-name", "vtd-"+projectId+"-"+nodeName, 1) podString = strings.Replace(podString, "cicv-data-closedloop-ip", "vtd-"+projectId+"-"+nodeName, 1) // todo golang 连接 kafka podString = strings.Replace(podString, "kafka-ip", "vtd-"+projectId+"-"+nodeName, 1) } } func JsonToTaskCache(jsonData string) (entity.TaskCache, error) { // 创建一个 Person 类型的变量 var taskCache entity.TaskCache // 使用 json.Unmarshal 解析 JSON 字符串到结构体 err := json.Unmarshal([]byte(jsonData), &taskCache) if err != nil { return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err)) } return taskCache, nil }