run_task.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package service
  2. import (
  3. "cicv-data-closedloop/amd64/dispatch_server/package/domain"
  4. "cicv-data-closedloop/amd64/dispatch_server/package/entity"
  5. "cicv-data-closedloop/amd64/dispatch_server/package/global"
  6. "cicv-data-closedloop/amd64/dispatch_server/package/infra"
  7. "cicv-data-closedloop/amd64/dispatch_server/package/util"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "strings"
  12. "time"
  13. )
  14. /*
  15. 负责处理用户等待队列中的任务
  16. 负责运行集群等待队列中的任务
  17. */
  18. // 判断用户等待队列中的任务是否可以加入到集群等待队列
  19. func RunWaitingUser() {
  20. for {
  21. time.Sleep(2 * time.Second)
  22. global.RunTaskMutex.Lock()
  23. // 获取Redis列表中的值
  24. taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
  25. if err != nil {
  26. infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
  27. continue
  28. }
  29. for _, taskCacheJson := range taskCacheJsons {
  30. taskCache, err := JsonToTaskCache(taskCacheJson)
  31. if err != nil {
  32. infra.GlobalLogger.Error(err)
  33. continue
  34. }
  35. userId := taskCache.UserId
  36. userParallelism := taskCache.UserParallelism
  37. task := taskCache.Task
  38. // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
  39. if domain.CanRunUser(userId, userParallelism) { // 可以运行
  40. err = domain.AddWaitingCluster(userId, userParallelism, task)
  41. if err != nil {
  42. infra.GlobalLogger.Error(err)
  43. continue
  44. }
  45. err = domain.DeleteWaitingUser(task.Info.TaskId)
  46. if err != nil {
  47. infra.GlobalLogger.Error(err)
  48. continue
  49. }
  50. }
  51. }
  52. global.RunTaskMutex.Unlock()
  53. }
  54. }
  55. // 集群等待队列中的任务判断是否可以加入集群运行队列
  56. func RunWaitingCluster() {
  57. for {
  58. time.Sleep(2 * time.Second)
  59. global.GpuNodeListMutex.Lock()
  60. // 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
  61. can, gpuNode, err := domain.CanRunCluster()
  62. if err != nil {
  63. infra.GlobalLogger.Error(err)
  64. continue
  65. }
  66. var firstTaskCache entity.TaskCache
  67. if can {
  68. // 移除并取出
  69. firstTaskCacheJson, err := infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
  70. if err != nil {
  71. infra.GlobalLogger.Error("移除并取出集群等待队列中的头元素报错,错误信息为:", err)
  72. continue
  73. }
  74. firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
  75. if err != nil {
  76. infra.GlobalLogger.Error(err)
  77. continue
  78. }
  79. err = domain.AddRunningCluster(firstTaskCache)
  80. if err != nil {
  81. infra.GlobalLogger.Error(err)
  82. continue
  83. }
  84. }
  85. global.GpuNodeListMutex.Unlock()
  86. // --------------- 发送 kafka 消息(获取偏移量和分区) ---------------
  87. // --------------- 启动 k8s pod ---------------
  88. projectId := firstTaskCache.Task.Info.ProjectId
  89. nodeName := gpuNode.Hostname
  90. // 1 生成 podName
  91. podName := "project-" + projectId + "-" + util.NewShortUUID()
  92. // 2 生成模板文件名称
  93. podYaml := nodeName + "#" + podName + ".yaml"
  94. // 3 模板yaml存储路径
  95. yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml
  96. // 4 模板yaml备份路径
  97. yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml
  98. fmt.Println(yamlPath, yamlPathBak)
  99. // 5
  100. podString, err := util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYaml)
  101. if err != nil {
  102. infra.GlobalLogger.Error(err)
  103. continue
  104. }
  105. podString = strings.Replace(podString, "vtd-container-name", "vtd-"+projectId+"-"+nodeName, 1)
  106. podString = strings.Replace(podString, "cicv-data-closedloop-ip", infra.ApplicationYaml.Web.IpPrivate, 1)
  107. podString = strings.Replace(podString, "kafka-ip", infra.ApplicationYaml.Kafka.Brokers[0], 1)
  108. podString = strings.Replace(podString, "kafka-topic", projectId, 1)
  109. // 发送消息之后会拿到消息的分区和偏移量
  110. podString = strings.Replace(podString, "kafka-partition", projectId, 1)
  111. podString = strings.Replace(podString, "kafka-offset", projectId, 1)
  112. // todo cpu编号是剩余并行度减一
  113. }
  114. }
  115. func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
  116. // 创建一个 Person 类型的变量
  117. var taskCache entity.TaskCache
  118. // 使用 json.Unmarshal 解析 JSON 字符串到结构体
  119. err := json.Unmarshal([]byte(jsonData), &taskCache)
  120. if err != nil {
  121. return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
  122. }
  123. return taskCache, nil
  124. }