123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- package service
- import (
- "cicv-data-closedloop/amd64/dispatch_server/package/domain"
- "cicv-data-closedloop/amd64/dispatch_server/package/entity"
- "cicv-data-closedloop/amd64/dispatch_server/package/global"
- "cicv-data-closedloop/amd64/dispatch_server/package/infra"
- "cicv-data-closedloop/amd64/dispatch_server/package/util"
- "encoding/json"
- "errors"
- "fmt"
- "strings"
- "time"
- )
- /*
- 负责处理用户等待队列中的任务
- 负责运行集群等待队列中的任务
- */
- // 判断用户等待队列中的任务是否可以加入到集群等待队列
- func RunWaitingUser() {
- for {
- time.Sleep(2 * time.Second)
- global.RunTaskMutex.Lock()
- // 获取Redis列表中的值
- taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
- if err != nil {
- infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
- continue
- }
- for _, taskCacheJson := range taskCacheJsons {
- taskCache, err := JsonToTaskCache(taskCacheJson)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- userId := taskCache.UserId
- userParallelism := taskCache.UserParallelism
- task := taskCache.Task
- // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
- if domain.CanRunUser(userId, userParallelism) { // 可以运行
- err = domain.AddWaitingCluster(userId, userParallelism, task)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- err = domain.DeleteWaitingUser(task.Info.TaskId)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- }
- }
- global.RunTaskMutex.Unlock()
- }
- }
- // 集群等待队列中的任务判断是否可以加入集群运行队列
- func RunWaitingCluster() {
- for {
- time.Sleep(2 * time.Second)
- global.GpuNodeListMutex.Lock()
- // 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
- can, gpuNode, err := domain.CanRunCluster()
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- var firstTaskCache entity.TaskCache
- if can {
- // 移除并取出
- firstTaskCacheJson, err := infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
- if err != nil {
- infra.GlobalLogger.Error("移除并取出集群等待队列中的头元素报错,错误信息为:", err)
- continue
- }
- firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- err = domain.AddRunningCluster(firstTaskCache)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- }
- global.GpuNodeListMutex.Unlock()
- // --------------- 启动 k8s pod ---------------
- projectId := firstTaskCache.Task.Info.ProjectId
- nodeName := gpuNode.Hostname
- // 1 生成 podName
- podName := "project-" + projectId + "-" + util.NewShortUUID()
- // 2 生成模板文件名称
- podYaml := nodeName + "#" + podName + ".yaml"
- // 3 模板yaml存储路径
- yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml
- // 4 模板yaml备份路径
- yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml
- // 5
- podString, err := util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYaml)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- podString = strings.Replace(podString, "vtd-container-name", "vtd-"+projectId+"-"+nodeName, 1)
- podString = strings.Replace(podString, "cicv-data-closedloop-ip", "vtd-"+projectId+"-"+nodeName, 1)
- // todo golang 连接 kafka
- podString = strings.Replace(podString, "kafka-ip", "vtd-"+projectId+"-"+nodeName, 1)
- }
- }
- func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
- // 创建一个 Person 类型的变量
- var taskCache entity.TaskCache
- // 使用 json.Unmarshal 解析 JSON 字符串到结构体
- err := json.Unmarshal([]byte(jsonData), &taskCache)
- if err != nil {
- return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
- }
- return taskCache, nil
- }
|