run_task.go 10 KB


  1. package service
  2. import (
  3. "cicv-data-closedloop/amd64/dispatch_server/package/domain"
  4. "cicv-data-closedloop/amd64/dispatch_server/package/entity"
  5. "cicv-data-closedloop/amd64/dispatch_server/package/global"
  6. "cicv-data-closedloop/amd64/dispatch_server/package/infra"
  7. "cicv-data-closedloop/amd64/dispatch_server/package/util"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "github.com/confluentinc/confluent-kafka-go/kafka"
  12. "path/filepath"
  13. "strconv"
  14. "strings"
  15. "time"
  16. )
  17. /*
  18. 负责处理用户等待队列中的任务
  19. 负责运行集群等待队列中的任务
  20. */
  21. // 判断用户等待队列中的任务是否可以加入到集群等待队列
  22. func RunWaitingUser() {
  23. for {
  24. time.Sleep(2 * time.Second)
  25. global.RunTaskMutex.Lock()
  26. // 获取Redis列表中的值
  27. taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
  28. if err != nil {
  29. infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
  30. continue
  31. }
  32. for _, taskCacheJson := range taskCacheJsons {
  33. taskCache, err := JsonToTaskCache(taskCacheJson)
  34. if err != nil {
  35. infra.GlobalLogger.Error(err)
  36. continue
  37. }
  38. userId := taskCache.UserId
  39. userParallelism := taskCache.UserParallelism
  40. algorithmObjectKey := taskCache.AlgorithmObjectKey
  41. task := taskCache.Task
  42. // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
  43. if domain.CanRunUser(userId, userParallelism) { // 可以运行
  44. err = domain.AddWaitingCluster(userId, userParallelism, algorithmObjectKey, task)
  45. if err != nil {
  46. infra.GlobalLogger.Error(err)
  47. continue
  48. }
  49. err = domain.DeleteWaitingUser(task.Info.TaskId)
  50. if err != nil {
  51. infra.GlobalLogger.Error(err)
  52. continue
  53. }
  54. }
  55. }
  56. global.RunTaskMutex.Unlock()
  57. }
  58. }
  59. // 集群等待队列中的任务判断是否可以加入集群运行队列
  60. func RunWaitingCluster() {
  61. for {
  62. time.Sleep(2 * time.Second)
  63. global.GpuNodeListMutex.Lock()
  64. // 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
  65. can, gpuNode, err := domain.CanRunCluster()
  66. if err != nil {
  67. infra.GlobalLogger.Error(err)
  68. continue
  69. }
  70. var firstTaskCache entity.TaskCache
  71. if can {
  72. infra.GlobalLogger.Infof("节点 %v 有剩余并行度。", gpuNode)
  73. // 判断是否有待运行的任务
  74. waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result()
  75. if waitingClusterNumber == 0 {
  76. infra.GlobalLogger.Info("集群没有等待运行的任务。")
  77. continue
  78. } else {
  79. infra.GlobalLogger.Infof("集群存在 %v 个等待运行的任务。", waitingClusterNumber)
  80. }
  81. // 取出但不移除
  82. firstTaskCacheJson, err := infra.GlobalRedisClient.LIndex(global.KeyTaskQueueWaitingCluster, 0).Result()
  83. if err != nil {
  84. infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
  85. continue
  86. }
  87. firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
  88. if err != nil {
  89. infra.GlobalLogger.Error(err)
  90. continue
  91. }
  92. err = domain.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
  93. if err != nil {
  94. infra.GlobalLogger.Error(err)
  95. continue
  96. }
  97. } else {
  98. infra.GlobalLogger.Infof("集群没有剩余并行度。")
  99. continue
  100. }
  101. global.GpuNodeListMutex.Unlock()
  102. // 获取项目ID
  103. projectId := firstTaskCache.Task.Info.ProjectId
  104. offsetKey := "offset:" + projectId
  105. offset := 0
  106. // 根据项目ID获取偏移量
  107. val, err := infra.GlobalRedisClient.Get(offsetKey).Result()
  108. if err != nil {
  109. infra.GlobalLogger.Infof("偏移量键 %v 不存在,初始化设置为 0。", offsetKey)
  110. err = infra.GlobalRedisClient.Set(offsetKey, 0, 0).Err()
  111. if err != nil {
  112. infra.GlobalLogger.Infof("偏移量键值对 %v 初始化失败,错误信息为: %v", offsetKey, err)
  113. continue
  114. }
  115. } else {
  116. offset, err = strconv.Atoi(val)
  117. if err != nil {
  118. infra.GlobalLogger.Infof("字符串 %v 转整数失败,错误信息为: %v", val, err)
  119. continue
  120. }
  121. }
  122. // 取出偏移量后将缓存中的加一,给下个任务使用。
  123. _, err = infra.GlobalRedisClient.Incr(offsetKey).Result()
  124. if err != nil {
  125. infra.GlobalLogger.Infof("偏移量 %v 加一失败,错误信息为: %v", offsetKey, err)
  126. continue
  127. }
  128. // --------------- 发送 kafka 消息(获取偏移量和分区) ---------------
  129. // 获取任务消息转json
  130. taskJson, err := TaskToJson(firstTaskCache.Task)
  131. if err != nil {
  132. infra.GlobalLogger.Error(err)
  133. continue
  134. }
  135. topic := projectId
  136. // 创建一个Message,并指定分区为0
  137. msg := &kafka.Message{
  138. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: infra.ApplicationYaml.Kafka.Partition, Offset: kafka.Offset(offset)},
  139. Value: []byte(taskJson),
  140. }
  141. // 发送消息,并处理结果
  142. err = infra.GlobalKafkaProducer.Produce(msg, nil)
  143. if err != nil {
  144. infra.GlobalLogger.Infof("发送任务消息 %v 失败,错误信息为: %v", msg, err)
  145. continue
  146. }
  147. // --------------- 下载算法 ---------------
  148. algorithmTarName := filepath.Base(firstTaskCache.AlgorithmObjectKey)
  149. algorithmTarPath := infra.ApplicationYaml.K8s.AlgorithmTarTempDir + algorithmTarName
  150. algorithmImageName := infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
  151. err = infra.GlobalOssBucket.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
  152. if err != nil {
  153. infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
  154. time.Sleep(time.Duration(2) * time.Second)
  155. continue
  156. }
  157. // 导入算法
  158. _, s, err := util.Execute("docker", "import", algorithmTarPath, algorithmImageName)
  159. _, s, err = util.Execute("docker", "push", algorithmImageName)
  160. if err != nil {
  161. infra.GlobalLogger.Errorf("导入算法镜像 %v 为 %v 失败,执行结果为:%v,错误信息为:%v", algorithmTarPath, algorithmImageName, s, err)
  162. time.Sleep(time.Duration(2) * time.Second)
  163. continue
  164. }
  165. infra.GlobalLogger.Infof("导入算法镜像 %v 为 %v 成功,执行结果为:%v", algorithmTarPath, algorithmImageName, s)
  166. err = util.RemoveFile(algorithmTarPath)
  167. if err != nil {
  168. infra.GlobalLogger.Errorf("删除算法镜像文件 %v 失败,错误信息为:%v", algorithmTarPath, err)
  169. }
  170. // --------------- 启动 k8s pod ---------------
  171. podName := "project-" + projectId + "-" + util.NewShortUUID()
  172. namespaceName := infra.ApplicationYaml.K8s.NamespaceName
  173. nodeName := gpuNode.Hostname
  174. restParallelism := gpuNode.Parallelism
  175. vtdContainer := "vtd-" + projectId
  176. algorithmContainer := "algorithm-" + projectId
  177. vtdImage := infra.ApplicationYaml.K8s.VtdImage
  178. // 2 生成模板文件名称
  179. podYaml := nodeName + "#" + podName + ".yaml"
  180. // 3 模板yaml存储路径
  181. yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml
  182. // 4 模板yaml备份路径
  183. yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml
  184. fmt.Println(yamlPath, yamlPathBak)
  185. // 5
  186. podString, err := util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYaml)
  187. if err != nil {
  188. infra.GlobalLogger.Error(err)
  189. continue
  190. }
  191. podString = strings.Replace(podString, "pod-name", podName, -1)
  192. podString = strings.Replace(podString, "namespace-name", namespaceName, -1)
  193. podString = strings.Replace(podString, "node-name", nodeName, -1)
  194. podString = strings.Replace(podString, "algorithm-image", algorithmImageName, -1)
  195. podString = strings.Replace(podString, "vtd-container", vtdContainer, -1)
  196. podString = strings.Replace(podString, "vtd-image", vtdImage, -1)
  197. podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommand, -1)
  198. podString = strings.Replace(podString, "platform-ip", infra.ApplicationYaml.Web.IpPrivate, -1)
  199. podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.Oss.Type, -1)
  200. podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.Oss.Endpoint, -1) // 不带http://前缀
  201. podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.Oss.AccessKeyId, -1)
  202. podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.Oss.AccessKeySecret, -1)
  203. podString = strings.Replace(podString, "kafka-ip", infra.ApplicationYaml.Kafka.Broker, -1)
  204. podString = strings.Replace(podString, "kafka-topic", projectId, -1)
  205. podString = strings.Replace(podString, "kafka-partition", "\""+util.ToString(infra.ApplicationYaml.Kafka.Partition)+"\"", -1)
  206. podString = strings.Replace(podString, "kafka-offset", "\""+util.ToString(offset)+"\"", -1)
  207. podString = strings.Replace(podString, "cpu-order", "\""+util.ToString(restParallelism-1)+"\"", -1) // cpu编号是剩余并行度-1
  208. podString = strings.Replace(podString, "algorithm-container", algorithmContainer, -1)
  209. // --------------- 保存成文件
  210. err = util.WriteFile(podString, yamlPath)
  211. err = util.WriteFile(podString, yamlPathBak)
  212. if err != nil {
  213. infra.GlobalLogger.Error("保存yaml字符串失败,错误信息为", err)
  214. continue
  215. }
  216. // --------------- 启动 pod
  217. _, s2, err := util.Execute("kubectl", "apply", "-f", yamlPath)
  218. if err != nil {
  219. infra.GlobalLogger.Errorf("保存yaml字符串失败,执行结果为 %v,错误信息为 %v", s2, err)
  220. continue
  221. }
  222. // --------------- 移除头元素
  223. _, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
  224. if err != nil {
  225. infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
  226. continue
  227. }
  228. }
  229. }
  230. func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
  231. // 创建一个 Person 类型的变量
  232. var taskCache entity.TaskCache
  233. // 使用 json.Unmarshal 解析 JSON 字符串到结构体
  234. err := json.Unmarshal([]byte(jsonData), &taskCache)
  235. if err != nil {
  236. return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
  237. }
  238. return taskCache, nil
  239. }
  240. func TaskToJson(task entity.Task) (string, error) {
  241. jsonData, err := json.MarshalIndent(task, "", " ")
  242. if err != nil {
  243. return "", errors.New("转json失败,错误信息为:" + err.Error())
  244. }
  245. return string(jsonData), nil
  246. }