package service import ( "cicv-data-closedloop/amd64/dispatch_server/package/domain" "cicv-data-closedloop/amd64/dispatch_server/package/entity" "cicv-data-closedloop/amd64/dispatch_server/package/global" "cicv-data-closedloop/amd64/dispatch_server/package/infra" "encoding/json" "errors" "fmt" "time" ) /* 负责处理用户等待队列中的任务 负责运行集群等待队列中的任务 */ // 判断用户等待队列中的任务是否可以加入到集群等待队列 func runWaitingUser() { for { time.Sleep(2 * time.Second) global.RunTaskMutex.Lock() // 获取Redis列表中的值 taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result() if err != nil { infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err) continue } for _, taskCacheJson := range taskCacheJsons { taskCache, err := JsonToTaskCache(taskCacheJson) if err != nil { infra.GlobalLogger.Error(err) continue } userId := taskCache.UserId userParallelism := taskCache.UserParallelism task := taskCache.Task // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动 if domain.CanRun(userId, userParallelism) { // 可以运行 err = domain.AddWaitingCluster(userId, userParallelism, task) if err != nil { infra.GlobalLogger.Error(err) continue } err = domain.DeleteWaitingUser(task.Info.TaskId) if err != nil { infra.GlobalLogger.Error(err) continue } } } global.RunTaskMutex.Unlock() } } // 集群等待队列中的任务判断是否可以加入集群运行队列 func runWaitingCluster() { // 2 判断集群并行度 //{ // // 1 判断是否可以运行,可以运行的加入集群运行队列,不能运行的加入集群等待队列 // TaskQueueRunningClusterNumber, _ := infra.GlobalRedisClient.LLen(KeyTaskQueueRunningCluster).Result() // // 2 判断是否可以运行,可以运行的执行运行命令并加入运行队列,不能运行的加入集群等待队列 //} } func JsonToTaskCache(jsonData string) (entity.TaskCache, error) { // 创建一个 Person 类型的变量 var taskCache entity.TaskCache // 使用 json.Unmarshal 解析 JSON 字符串到结构体 err := json.Unmarshal([]byte(jsonData), &taskCache) if err != nil { return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err)) } return taskCache, nil }