run_task.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package service
  2. import (
  3. "cicv-data-closedloop/amd64/dispatch_server/package/domain"
  4. "cicv-data-closedloop/amd64/dispatch_server/package/entity"
  5. "cicv-data-closedloop/amd64/dispatch_server/package/global"
  6. "cicv-data-closedloop/amd64/dispatch_server/package/infra"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "time"
  11. )
  12. /*
  13. 负责处理用户等待队列中的任务
  14. 负责运行集群等待队列中的任务
  15. */
  16. // 判断用户等待队列中的任务是否可以加入到集群等待队列
  17. func runWaitingUser() {
  18. for {
  19. time.Sleep(2 * time.Second)
  20. global.RunTaskMutex.Lock()
  21. // 获取Redis列表中的值
  22. taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
  23. if err != nil {
  24. infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
  25. continue
  26. }
  27. for _, taskCacheJson := range taskCacheJsons {
  28. taskCache, err := JsonToTaskCache(taskCacheJson)
  29. if err != nil {
  30. infra.GlobalLogger.Error(err)
  31. continue
  32. }
  33. userId := taskCache.UserId
  34. userParallelism := taskCache.UserParallelism
  35. task := taskCache.Task
  36. // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
  37. if domain.CanRun(userId, userParallelism) { // 可以运行
  38. err = domain.AddWaitingCluster(userId, userParallelism, task)
  39. if err != nil {
  40. infra.GlobalLogger.Error(err)
  41. continue
  42. }
  43. err = domain.DeleteWaitingUser(task.Info.TaskId)
  44. if err != nil {
  45. infra.GlobalLogger.Error(err)
  46. continue
  47. }
  48. }
  49. }
  50. global.RunTaskMutex.Unlock()
  51. }
  52. }
  53. // 集群等待队列中的任务判断是否可以加入集群运行队列
  54. func runWaitingCluster() {
  55. // 2 判断集群并行度
  56. //{
  57. // // 1 判断是否可以运行,可以运行的加入集群运行队列,不能运行的加入集群等待队列
  58. // TaskQueueRunningClusterNumber, _ := infra.GlobalRedisClient.LLen(KeyTaskQueueRunningCluster).Result()
  59. // // 2 判断是否可以运行,可以运行的执行运行命令并加入运行队列,不能运行的加入集群等待队列
  60. //}
  61. }
  62. func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
  63. // 创建一个 Person 类型的变量
  64. var taskCache entity.TaskCache
  65. // 使用 json.Unmarshal 解析 JSON 字符串到结构体
  66. err := json.Unmarshal([]byte(jsonData), &taskCache)
  67. if err != nil {
  68. return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
  69. }
  70. return taskCache, nil
  71. }