run_task.go 8.0 KB


  1. package service
  2. import (
  3. "cicv-data-closedloop/amd64/dispatch_server/package/domain"
  4. "cicv-data-closedloop/amd64/dispatch_server/package/entity"
  5. "cicv-data-closedloop/amd64/dispatch_server/package/global"
  6. "cicv-data-closedloop/amd64/dispatch_server/package/infra"
  7. "cicv-data-closedloop/amd64/dispatch_server/package/util"
  8. "cicv-data-closedloop/common/config/c_log"
  9. "encoding/json"
  10. "errors"
  11. "fmt"
  12. "github.com/confluentinc/confluent-kafka-go/kafka"
  13. "path/filepath"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. /*
  19. 负责处理用户等待队列中的任务
  20. 负责运行集群等待队列中的任务
  21. */
  22. // 判断用户等待队列中的任务是否可以加入到集群等待队列
  23. func RunWaitingUser() {
  24. for {
  25. time.Sleep(2 * time.Second)
  26. global.RunTaskMutex.Lock()
  27. // 获取Redis列表中的值
  28. taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
  29. if err != nil {
  30. infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
  31. continue
  32. }
  33. for _, taskCacheJson := range taskCacheJsons {
  34. taskCache, err := JsonToTaskCache(taskCacheJson)
  35. if err != nil {
  36. infra.GlobalLogger.Error(err)
  37. continue
  38. }
  39. userId := taskCache.UserId
  40. userParallelism := taskCache.UserParallelism
  41. algorithmObjectKey := taskCache.AlgorithmObjectKey
  42. task := taskCache.Task
  43. // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
  44. if domain.CanRunUser(userId, userParallelism) { // 可以运行
  45. err = domain.AddWaitingCluster(userId, userParallelism, algorithmObjectKey, task)
  46. if err != nil {
  47. infra.GlobalLogger.Error(err)
  48. continue
  49. }
  50. err = domain.DeleteWaitingUser(task.Info.TaskId)
  51. if err != nil {
  52. infra.GlobalLogger.Error(err)
  53. continue
  54. }
  55. }
  56. }
  57. global.RunTaskMutex.Unlock()
  58. }
  59. }
  60. // 集群等待队列中的任务判断是否可以加入集群运行队列
  61. func RunWaitingCluster() {
  62. for {
  63. time.Sleep(2 * time.Second)
  64. global.GpuNodeListMutex.Lock()
  65. // 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
  66. can, gpuNode, err := domain.CanRunCluster()
  67. if err != nil {
  68. infra.GlobalLogger.Error(err)
  69. continue
  70. }
  71. var firstTaskCache entity.TaskCache
  72. if can {
  73. // 取出但不移除
  74. firstTaskCacheJson, err := infra.GlobalRedisClient.LIndex(global.KeyTaskQueueWaitingCluster, 0).Result()
  75. if err != nil {
  76. infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
  77. continue
  78. }
  79. firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
  80. if err != nil {
  81. infra.GlobalLogger.Error(err)
  82. continue
  83. }
  84. err = domain.AddRunningCluster(firstTaskCache)
  85. if err != nil {
  86. infra.GlobalLogger.Error(err)
  87. continue
  88. }
  89. }
  90. global.GpuNodeListMutex.Unlock()
  91. // 获取项目ID
  92. projectId := firstTaskCache.Task.Info.ProjectId
  93. offsetKey := "offset:" + projectId
  94. offset := 0
  95. // 根据项目ID获取偏移量
  96. val, err := infra.GlobalRedisClient.Get(offsetKey).Result()
  97. if err != nil {
  98. infra.GlobalLogger.Infof("偏移量键 %v 不存在,初始化设置为 0。", offsetKey)
  99. err = infra.GlobalRedisClient.Set(offsetKey, 0, 0).Err()
  100. if err != nil {
  101. infra.GlobalLogger.Infof("偏移量键值对 %v 初始化失败,错误信息为: %v", offsetKey, err)
  102. continue
  103. }
  104. } else {
  105. offset, err = strconv.Atoi(val)
  106. if err != nil {
  107. infra.GlobalLogger.Infof("字符串 %v 转整数失败,错误信息为: %v", val, err)
  108. continue
  109. }
  110. }
  111. // 取出偏移量后将缓存中的加一,给下个任务使用。
  112. _, err = infra.GlobalRedisClient.Incr(offsetKey).Result()
  113. if err != nil {
  114. infra.GlobalLogger.Infof("偏移量 %v 加一失败,错误信息为: %v", offsetKey, err)
  115. continue
  116. }
  117. // --------------- 发送 kafka 消息(获取偏移量和分区) ---------------
  118. // 获取任务消息转json
  119. taskJson, err := TaskToJson(firstTaskCache.Task)
  120. if err != nil {
  121. infra.GlobalLogger.Error(err)
  122. continue
  123. }
  124. topic := projectId
  125. value := []byte(taskJson)
  126. // 创建一个Message,并指定分区为0
  127. msg := &kafka.Message{
  128. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0, Offset: kafka.Offset(offset)},
  129. Value: value,
  130. }
  131. // 发送消息,并处理结果
  132. err = infra.GlobalKafkaProducer.Produce(msg, nil)
  133. if err != nil {
  134. infra.GlobalLogger.Infof("发送任务消息 %v 失败,错误信息为: %v", msg, err)
  135. continue
  136. }
  137. // --------------- 下载算法 ---------------
  138. algorithmTarName := filepath.Base(firstTaskCache.AlgorithmObjectKey)
  139. algorithmTarPath := infra.ApplicationYaml.K8s.AlgorithmTarTempDir + algorithmTarName
  140. algorithmImageName := infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
  141. err = infra.GlobalOssBucket.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
  142. if err != nil {
  143. c_log.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
  144. time.Sleep(time.Duration(2) * time.Second)
  145. continue
  146. }
  147. // 导入算法
  148. _, s, err := util.Execute("docker", "import", algorithmTarPath, algorithmImageName)
  149. _, s, err = util.Execute("docker", "push", algorithmImageName)
  150. if err != nil {
  151. c_log.GlobalLogger.Errorf("导入算法镜像 %v 为 %v 失败,执行结果为:%v,错误信息为:%v", algorithmTarPath, algorithmImageName, s, err)
  152. time.Sleep(time.Duration(2) * time.Second)
  153. continue
  154. }
  155. c_log.GlobalLogger.Infof("导入算法镜像 %v 为 %v 成功,执行结果为:%v", algorithmTarPath, algorithmImageName, s)
  156. err = util.RemoveFile(algorithmTarPath)
  157. if err != nil {
  158. c_log.GlobalLogger.Errorf("删除算法镜像文件 %v 失败,错误信息为:%v", algorithmTarPath, err)
  159. }
  160. // --------------- 启动 k8s pod ---------------
  161. nodeName := gpuNode.Hostname
  162. // 1 生成 podName
  163. podName := "project-" + projectId + "-" + util.NewShortUUID()
  164. // 2 生成模板文件名称
  165. podYaml := nodeName + "#" + podName + ".yaml"
  166. // 3 模板yaml存储路径
  167. yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml
  168. // 4 模板yaml备份路径
  169. yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml
  170. fmt.Println(yamlPath, yamlPathBak)
  171. // 5
  172. podString, err := util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYaml)
  173. if err != nil {
  174. infra.GlobalLogger.Error(err)
  175. continue
  176. }
  177. podString = strings.Replace(podString, "vtd-container-name", "vtd-"+projectId+"-"+nodeName, 1)
  178. podString = strings.Replace(podString, "cicv-data-closedloop-ip", infra.ApplicationYaml.Web.IpPrivate, 1)
  179. podString = strings.Replace(podString, "kafka-ip", infra.ApplicationYaml.Kafka.Brokers[0], 1)
  180. podString = strings.Replace(podString, "kafka-topic", projectId, 1)
  181. // 发送消息之后会拿到消息的分区和偏移量
  182. podString = strings.Replace(podString, "kafka-partition", projectId, 1)
  183. podString = strings.Replace(podString, "kafka-offset", projectId, 1)
  184. // todo cpu编号是剩余并行度减一
  185. // --------------- 移除头元素
  186. _, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
  187. if err != nil {
  188. infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
  189. continue
  190. }
  191. }
  192. }
  193. func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
  194. // 创建一个 Person 类型的变量
  195. var taskCache entity.TaskCache
  196. // 使用 json.Unmarshal 解析 JSON 字符串到结构体
  197. err := json.Unmarshal([]byte(jsonData), &taskCache)
  198. if err != nil {
  199. return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
  200. }
  201. return taskCache, nil
  202. }
  203. func TaskToJson(task entity.Task) (string, error) {
  204. jsonData, err := json.MarshalIndent(task, "", " ")
  205. if err != nil {
  206. return "", errors.New("转json失败,错误信息为:" + err.Error())
  207. }
  208. return string(jsonData), nil
  209. }