12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- package service
- import (
- "cicv-data-closedloop/amd64/dispatch_server/package/domain"
- "cicv-data-closedloop/amd64/dispatch_server/package/entity"
- "cicv-data-closedloop/amd64/dispatch_server/package/global"
- "cicv-data-closedloop/amd64/dispatch_server/package/infra"
- "encoding/json"
- "errors"
- "fmt"
- "time"
- )
- /*
- 负责处理用户等待队列中的任务
- 负责运行集群等待队列中的任务
- */
- // 判断用户等待队列中的任务是否可以加入到集群等待队列
- func runWaitingUser() {
- for {
- time.Sleep(2 * time.Second)
- global.RunTaskMutex.Lock()
- // 获取Redis列表中的值
- taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
- if err != nil {
- infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
- continue
- }
- for _, taskCacheJson := range taskCacheJsons {
- taskCache, err := JsonToTaskCache(taskCacheJson)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- userId := taskCache.UserId
- userParallelism := taskCache.UserParallelism
- task := taskCache.Task
- // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
- if domain.CanRun(userId, userParallelism) { // 可以运行
- err = domain.AddWaitingCluster(userId, userParallelism, task)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- err = domain.DeleteWaitingUser(task.Info.TaskId)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- }
- }
- global.RunTaskMutex.Unlock()
- }
- }
- // 集群等待队列中的任务判断是否可以加入集群运行队列
- func runWaitingCluster() {
- // 2 判断集群并行度
- //{
- // // 1 判断是否可以运行,可以运行的加入集群运行队列,不能运行的加入集群等待队列
- // TaskQueueRunningClusterNumber, _ := infra.GlobalRedisClient.LLen(KeyTaskQueueRunningCluster).Result()
- // // 2 判断是否可以运行,可以运行的执行运行命令并加入运行队列,不能运行的加入集群等待队列
- //}
- }
- func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
- // 创建一个 Person 类型的变量
- var taskCache entity.TaskCache
- // 使用 json.Unmarshal 解析 JSON 字符串到结构体
- err := json.Unmarshal([]byte(jsonData), &taskCache)
- if err != nil {
- return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
- }
- return taskCache, nil
- }
|