run_task.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. package service
  2. import (
  3. "cicv-data-closedloop/amd64/dispatch_server/package/domain"
  4. "cicv-data-closedloop/amd64/dispatch_server/package/entity"
  5. "cicv-data-closedloop/amd64/dispatch_server/package/global"
  6. "cicv-data-closedloop/amd64/dispatch_server/package/infra"
  7. "cicv-data-closedloop/amd64/dispatch_server/package/util"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "github.com/confluentinc/confluent-kafka-go/kafka"
  12. "path/filepath"
  13. "strconv"
  14. "strings"
  15. "time"
  16. )
  17. /*
  18. 负责处理用户等待队列中的任务
  19. 负责运行集群等待队列中的任务
  20. */
  21. // 判断用户等待队列中的任务是否可以加入到集群等待队列
  22. func RunWaitingUser() {
  23. infra.GlobalLogger.Infof("启动【用户等待队列】监控进程。")
  24. for {
  25. time.Sleep(2 * time.Second)
  26. global.RunTaskMutex.Lock()
  27. // 获取Redis列表中的值
  28. taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
  29. if err != nil {
  30. infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
  31. continue
  32. }
  33. for _, taskCacheJson := range taskCacheJsons {
  34. taskCache, err := JsonToTaskCache(taskCacheJson)
  35. if err != nil {
  36. infra.GlobalLogger.Error(err)
  37. continue
  38. }
  39. userId := taskCache.UserId
  40. userParallelism := taskCache.UserParallelism
  41. algorithmObjectKey := taskCache.AlgorithmObjectKey
  42. task := taskCache.Task
  43. // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
  44. if domain.CanRunUser(userId, userParallelism) { // 可以运行
  45. err = domain.AddWaitingCluster(userId, userParallelism, algorithmObjectKey, task)
  46. if err != nil {
  47. infra.GlobalLogger.Error(err)
  48. continue
  49. }
  50. err = domain.DeleteWaitingUser(task.Info.TaskId)
  51. if err != nil {
  52. infra.GlobalLogger.Error(err)
  53. continue
  54. }
  55. }
  56. }
  57. global.RunTaskMutex.Unlock()
  58. }
  59. }
  60. // 集群等待队列中的任务判断是否可以加入集群运行队列
  61. func RunWaitingCluster() {
  62. infra.GlobalLogger.Infof("启动【集群等待队列】监控进程。")
  63. for {
  64. var algorithmTarName string
  65. var algorithmTarPath string
  66. var algorithmImageName string
  67. time.Sleep(2 * time.Second)
  68. global.GpuNodeListMutex.Lock()
  69. // 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
  70. can, gpuNode, err := domain.CanRunCluster()
  71. if err != nil {
  72. infra.GlobalLogger.Error(err)
  73. global.GpuNodeListMutex.Unlock()
  74. continue
  75. }
  76. var firstTaskCache entity.TaskCache
  77. if can {
  78. //infra.GlobalLogger.Infof("节点 %v 有剩余并行度。", gpuNode)
  79. // 判断是否有待运行的任务
  80. waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result()
  81. if waitingClusterNumber == 0 {
  82. //infra.GlobalLogger.Info("集群没有等待运行的任务。")
  83. global.GpuNodeListMutex.Unlock()
  84. continue
  85. } else {
  86. infra.GlobalLogger.Infof("集群存在 %v 个等待运行的任务。", waitingClusterNumber)
  87. }
  88. // 取出但不移除
  89. {
  90. firstTaskCacheJson, err := infra.GlobalRedisClient.LIndex(global.KeyTaskQueueWaitingCluster, 0).Result()
  91. if err != nil {
  92. infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
  93. global.GpuNodeListMutex.Unlock()
  94. continue
  95. }
  96. firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
  97. if err != nil {
  98. infra.GlobalLogger.Error(err)
  99. global.GpuNodeListMutex.Unlock()
  100. continue
  101. }
  102. }
  103. // --------------- 下载算法 ---------------
  104. {
  105. infra.GlobalLogger.Infof("开始下载算法 %v。", firstTaskCache.AlgorithmObjectKey)
  106. algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey)
  107. algorithmTarPath = infra.ApplicationYaml.K8s.AlgorithmTarTempDir + util.NewShortUUID() + "/" + algorithmTarName
  108. _ = util.CreateParentDir(algorithmTarPath)
  109. algorithmImageName = infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
  110. _ = infra.GlobalOssBucket.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
  111. if err != nil {
  112. infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
  113. time.Sleep(time.Duration(2) * time.Second)
  114. global.GpuNodeListMutex.Unlock()
  115. continue
  116. }
  117. infra.GlobalLogger.Infof("下载算法 %v 成功。", firstTaskCache.AlgorithmObjectKey)
  118. }
  119. } else {
  120. infra.GlobalLogger.Infof("集群没有剩余并行度。")
  121. global.GpuNodeListMutex.Unlock()
  122. continue
  123. }
  124. global.GpuNodeListMutex.Unlock()
  125. // 获取项目ID
  126. projectId := firstTaskCache.Task.Info.ProjectId
  127. offsetKey := "offset:" + projectId
  128. offset := 0
  129. // 根据项目ID获取偏移量
  130. val, err := infra.GlobalRedisClient.Get(offsetKey).Result()
  131. if err != nil {
  132. infra.GlobalLogger.Infof("偏移量键 %v 不存在,初始化设置为 0。", offsetKey)
  133. err = infra.GlobalRedisClient.Set(offsetKey, 0, 0).Err()
  134. if err != nil {
  135. infra.GlobalLogger.Infof("偏移量键值对 %v 初始化失败,错误信息为: %v", offsetKey, err)
  136. continue
  137. }
  138. } else {
  139. offset, err = strconv.Atoi(val)
  140. if err != nil {
  141. infra.GlobalLogger.Infof("字符串 %v 转整数失败,错误信息为: %v", val, err)
  142. continue
  143. }
  144. infra.GlobalLogger.Infof("当前任务使用偏移量【%v】", offset)
  145. }
  146. // 取出偏移量后将缓存中的加一,给下个任务使用。
  147. _, err = infra.GlobalRedisClient.Incr(offsetKey).Result()
  148. if err != nil {
  149. infra.GlobalLogger.Infof("偏移量 %v 加一失败,错误信息为: %v", offsetKey, err)
  150. continue
  151. }
  152. infra.GlobalLogger.Infof("偏移量【%v】加一给下个任务使用。", offsetKey)
  153. // --------------- 发送 kafka 消息(获取偏移量和分区) ---------------
  154. // 获取任务消息转json
  155. // todo 将摄像头写死一个参数
  156. firstTaskCache.Task.Vehicle.Sensors.Camera = []entity.SensorCamera{
  157. {
  158. Sensor: entity.Sensor{
  159. SensorName: "custom_camera",
  160. SensorNear: 1.0,
  161. SensorFar: 1500.0,
  162. SensorX: -15000.0,
  163. SensorY: 0.0,
  164. SensorZ: 5000.0,
  165. SensorH: 0.0,
  166. SensorP: 10.0,
  167. SensorR: 0.0,
  168. },
  169. SensorForH: 45,
  170. SensorForV: 27,
  171. SensorResolution: "480*270",
  172. SensorFrameRate: 25,
  173. },
  174. }
  175. taskJson, err := TaskToJson(firstTaskCache.Task)
  176. if err != nil {
  177. infra.GlobalLogger.Error(err)
  178. continue
  179. }
  180. topic := projectId
  181. // 创建一个Message,并指定分区为0
  182. msg := &kafka.Message{
  183. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: infra.ApplicationYaml.Kafka.Partition, Offset: kafka.Offset(offset)},
  184. Value: []byte(taskJson),
  185. }
  186. // 发送消息,并处理结果
  187. err = infra.GlobalKafkaProducer.Produce(msg, nil)
  188. if err != nil {
  189. infra.GlobalLogger.Infof("发送任务消息 %v 失败,错误信息为: %v", msg, err)
  190. continue
  191. }
  192. infra.GlobalLogger.Infof("发送任务消息成功,话题为【%v】,偏移量为【%v】。", topic, offset)
  193. // 导入算法
  194. _, s, err := util.Execute("docker", "import", algorithmTarPath, algorithmImageName)
  195. _, s, err = util.Execute("docker", "push", algorithmImageName)
  196. if err != nil {
  197. infra.GlobalLogger.Errorf("导入算法镜像 %v 为 %v 失败,执行结果为:%v,错误信息为:%v", algorithmTarPath, algorithmImageName, s, err)
  198. time.Sleep(time.Duration(2) * time.Second)
  199. continue
  200. }
  201. infra.GlobalLogger.Infof("导入算法镜像 %v 为 %v 成功,执行结果为:%v", algorithmTarPath, algorithmImageName, s)
  202. err = util.RemoveFile(algorithmTarPath)
  203. if err != nil {
  204. infra.GlobalLogger.Errorf("删除算法镜像文件 %v 失败,错误信息为:%v", algorithmTarPath, err)
  205. }
  206. // --------------- 启动 k8s pod ---------------
  207. podName := "project-" + projectId + "-" + util.NewShortUUID()
  208. namespaceName := infra.ApplicationYaml.K8s.NamespaceName
  209. nodeName := gpuNode.Hostname
  210. restParallelism := gpuNode.Parallelism
  211. vtdContainer := "vtd-" + projectId
  212. algorithmContainer := "algorithm-" + projectId
  213. vtdImage := infra.ApplicationYaml.K8s.VtdImage
  214. // 2 生成模板文件名称
  215. podYaml := nodeName + "#" + podName + ".yaml"
  216. // 3 模板yaml存储路径
  217. yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml
  218. // 4 模板yaml备份路径
  219. yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml
  220. fmt.Println(yamlPath, yamlPathBak)
  221. // 5
  222. podString, err := util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYaml)
  223. if err != nil {
  224. infra.GlobalLogger.Error(err)
  225. continue
  226. }
  227. podString = strings.Replace(podString, "pod-name", podName, -1)
  228. podString = strings.Replace(podString, "namespace-name", namespaceName, -1)
  229. podString = strings.Replace(podString, "node-name", nodeName, -1)
  230. podString = strings.Replace(podString, "algorithm-image", algorithmImageName, -1)
  231. podString = strings.Replace(podString, "vtd-container", vtdContainer, -1)
  232. podString = strings.Replace(podString, "vtd-image", vtdImage, -1)
  233. podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommand, -1)
  234. podString = strings.Replace(podString, "platform-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1)
  235. podString = strings.Replace(podString, "simulation-cloud-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1)
  236. podString = strings.Replace(podString, "platform-type", "\""+infra.ApplicationYaml.K8s.PlatformType+"\"", -1)
  237. podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.Oss.Type, -1)
  238. podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.Oss.Endpoint, -1) // 不带http://前缀
  239. podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.Oss.AccessKeyId, -1)
  240. podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.Oss.AccessKeySecret, -1)
  241. podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.Oss.BucketName, -1)
  242. podString = strings.Replace(podString, "kafka-ip", infra.ApplicationYaml.Kafka.Broker, -1)
  243. podString = strings.Replace(podString, "kafka-topic", projectId, -1)
  244. podString = strings.Replace(podString, "kafka-partition", "\""+util.ToString(infra.ApplicationYaml.Kafka.Partition)+"\"", -1)
  245. podString = strings.Replace(podString, "kafka-offset", "\""+util.ToString(offset)+"\"", -1)
  246. podString = strings.Replace(podString, "cpu-order", "\""+util.ToString(restParallelism-1)+"\"", -1) // cpu编号是剩余并行度-1
  247. podString = strings.Replace(podString, "algorithm-container", algorithmContainer, -1)
  248. // --------------- 保存成文件
  249. err = util.WriteFile(podString, yamlPath)
  250. err = util.WriteFile(podString, yamlPathBak)
  251. if err != nil {
  252. infra.GlobalLogger.Error("保存yaml字符串失败,错误信息为", err)
  253. continue
  254. }
  255. infra.GlobalLogger.Infof("保存yaml文件到执行路径【%v】和备份路径【%v】", yamlPath, yamlPathBak)
  256. // --------------- 启动 pod
  257. _, s2, err := util.Execute("kubectl", "apply", "-f", yamlPath)
  258. if err != nil {
  259. infra.GlobalLogger.Errorf("启动pod失败,执行结果为 %v,错误信息为 %v", s2, err)
  260. continue
  261. }
  262. infra.GlobalLogger.Errorf("启动pod成功,执行结果为 %v。", s2)
  263. // 收尾
  264. {
  265. // --------------- 添加到运行队列
  266. err = domain.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
  267. if err != nil {
  268. infra.GlobalLogger.Error(err)
  269. global.GpuNodeListMutex.Unlock()
  270. continue
  271. }
  272. // --------------- 从等待队列中移除
  273. _, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
  274. if err != nil {
  275. infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
  276. continue
  277. }
  278. // --------------- 删除镜像文件
  279. _ = util.RemoveFile(algorithmTarPath)
  280. }
  281. }
  282. }
  283. func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
  284. // 创建一个 Person 类型的变量
  285. var taskCache entity.TaskCache
  286. // 使用 json.Unmarshal 解析 JSON 字符串到结构体
  287. err := json.Unmarshal([]byte(jsonData), &taskCache)
  288. if err != nil {
  289. return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
  290. }
  291. return taskCache, nil
  292. }
  293. func TaskToJson(task entity.Task) (string, error) {
  294. jsonData, err := json.Marshal(task)
  295. if err != nil {
  296. return "", errors.New("转json失败,错误信息为:" + err.Error())
  297. }
  298. return string(jsonData), nil
  299. }