run_task.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. package service
  2. import (
  3. "dcl_dispatch_server/src/package/domain/project"
  4. "dcl_dispatch_server/src/package/domain/task_cache"
  5. "dcl_dispatch_server/src/package/global"
  6. "dcl_dispatch_server/src/package/infra"
  7. "dcl_dispatch_server/src/package/infra/redis"
  8. "dcl_dispatch_server/src/package/util"
  9. "fmt"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/confluentinc/confluent-kafka-go/kafka"
  15. )
  16. /*
  17. 负责处理用户等待队列中的任务
  18. 负责运行集群等待队列中的任务
  19. */
  20. // RunWaitingUser 判断用户等待队列中的任务是否可以加入到集群等待队列
  21. func RunWaitingUser() {
  22. infra.GlobalLogger.Infof("启动【用户等待队列】监控进程。")
  23. for {
  24. time.Sleep(2 * time.Second)
  25. global.RunTaskMutex.Lock()
  26. // 获取Redis列表中的值
  27. taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
  28. if err != nil {
  29. infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
  30. continue
  31. }
  32. for _, taskCacheJson := range taskCacheJsons {
  33. taskCache, err := task_cache.JsonToTaskCache(taskCacheJson)
  34. if err != nil {
  35. infra.GlobalLogger.Error(err)
  36. continue
  37. }
  38. userId := taskCache.UserId
  39. userParallelism := taskCache.UserParallelism
  40. algorithmObjectKey := taskCache.AlgorithmObjectKey
  41. equipmentType := taskCache.EquipmentType
  42. task := taskCache.Task
  43. // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
  44. if redis.CanRunUser(userId, userParallelism) { // 可以运行
  45. err = redis.AddWaitingCluster(&project.Project{
  46. UserId: userId,
  47. Parallelism: userParallelism,
  48. AlgorithmObjectKey: algorithmObjectKey,
  49. EquipmentType: equipmentType,
  50. }, task)
  51. if err != nil {
  52. infra.GlobalLogger.Error(err)
  53. continue
  54. }
  55. err = redis.DeleteWaitingUser(task.Info.TaskId)
  56. if err != nil {
  57. infra.GlobalLogger.Error(err)
  58. continue
  59. }
  60. }
  61. }
  62. global.RunTaskMutex.Unlock()
  63. }
  64. }
  65. // 集群等待队列中的任务判断是否可以加入集群运行队列
  66. func RunWaitingCluster() {
  67. infra.GlobalLogger.Infof("启动【集群等待队列】监控进程。")
  68. for {
  69. time.Sleep(2 * time.Second)
  70. global.GpuNodeListMutex.Lock()
  71. // 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
  72. can, gpuNode, err := redis.CanRunCluster()
  73. if err != nil {
  74. infra.GlobalLogger.Error(err)
  75. global.GpuNodeListMutex.Unlock()
  76. continue
  77. }
  78. firstTaskCache := task_cache.TaskCache{}
  79. algorithmTarName := ""
  80. algorithmTarPath := ""
  81. algorithmImageName := ""
  82. algorithmImageNameWithVersion := ""
  83. algorithmExist := false
  84. if can {
  85. // 判断是否有待运行的任务
  86. waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result()
  87. if waitingClusterNumber == 0 {
  88. global.GpuNodeListMutex.Unlock()
  89. continue
  90. } else {
  91. infra.GlobalLogger.Infof("集群存在 %v 个等待运行的任务。", waitingClusterNumber)
  92. }
  93. // 取出并移出,20241017 防止报错后陷入死循环
  94. {
  95. firstTaskCacheJson, err := infra.GlobalRedisClient.LIndex(global.KeyTaskQueueWaitingCluster, 0).Result()
  96. if err != nil {
  97. infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
  98. global.GpuNodeListMutex.Unlock()
  99. continue
  100. }
  101. firstTaskCache, err = task_cache.JsonToTaskCache(firstTaskCacheJson)
  102. if err != nil {
  103. infra.GlobalLogger.Error(err)
  104. global.GpuNodeListMutex.Unlock()
  105. continue
  106. }
  107. // --------------- 从等待队列中移除
  108. if _, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result(); err != nil {
  109. infra.GlobalLogger.Error(err)
  110. continue
  111. }
  112. }
  113. // --------------- 下载算法 ---------------
  114. {
  115. infra.GlobalLogger.Infof("开始下载算法 %v。", firstTaskCache.AlgorithmObjectKey)
  116. algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey)
  117. algorithmTarPath = infra.ApplicationYaml.K8s.AlgorithmTarTempDir + util.NewShortUUID() + "/" + algorithmTarName
  118. _ = util.CreateParentDir(algorithmTarPath)
  119. algorithmImageName = infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
  120. algorithmImageNameWithVersion = algorithmImageName + ":latest"
  121. algorithmExist = util.ImageExists(infra.GlobalDockerClient, algorithmImageName)
  122. if !algorithmExist {
  123. if firstTaskCache.Env == "cicv" {
  124. err = infra.GlobalOssBucketCicv.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
  125. } else {
  126. err = infra.GlobalOssBucketPji.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
  127. }
  128. if err != nil {
  129. infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
  130. time.Sleep(time.Duration(2) * time.Second)
  131. global.GpuNodeListMutex.Unlock()
  132. continue
  133. }
  134. infra.GlobalLogger.Infof("下载算法 %v 成功。", firstTaskCache.AlgorithmObjectKey)
  135. } else {
  136. infra.GlobalLogger.Infof("算法 %v 已存在。", algorithmImageName)
  137. }
  138. }
  139. } else {
  140. infra.GlobalLogger.Infof("集群没有剩余并行度。")
  141. global.GpuNodeListMutex.Unlock()
  142. continue
  143. }
  144. global.GpuNodeListMutex.Unlock()
  145. // 获取项目ID
  146. projectId := firstTaskCache.Task.Info.ProjectId
  147. offsetKey := "offset:" + projectId
  148. offset := 0
  149. // 根据项目ID获取偏移量
  150. val, err := infra.GlobalRedisClient.Get(offsetKey).Result()
  151. if err != nil {
  152. infra.GlobalLogger.Infof("偏移量键 %v 不存在,初始化设置为 0。", offsetKey)
  153. err = infra.GlobalRedisClient.Set(offsetKey, 0, 0).Err()
  154. if err != nil {
  155. infra.GlobalLogger.Infof("偏移量键值对 %v 初始化失败,错误信息为: %v", offsetKey, err)
  156. continue
  157. }
  158. } else {
  159. offset, err = strconv.Atoi(val)
  160. if err != nil {
  161. infra.GlobalLogger.Infof("字符串 %v 转整数失败,错误信息为: %v", val, err)
  162. continue
  163. }
  164. infra.GlobalLogger.Infof("当前任务使用偏移量【%v】", offset)
  165. }
  166. // 取出偏移量后将缓存中的加一,给下个任务使用。
  167. _, err = infra.GlobalRedisClient.Incr(offsetKey).Result()
  168. if err != nil {
  169. infra.GlobalLogger.Infof("偏移量 %v 加一失败,错误信息为: %v", offsetKey, err)
  170. continue
  171. }
  172. infra.GlobalLogger.Infof("偏移量【%v】加一给下个任务使用。", offsetKey)
  173. // ------- 解析 xosc ,从xosc中解析起终点位置&修改 xodr 和 osgb ---------------
  174. xoscOssPath := firstTaskCache.Task.Scenario.ScenarioOsc
  175. tempDir := "/mnt/disk001/dcl_dispatch_server/temp/"
  176. util.CreateDir(tempDir)
  177. xoscLocalPath := tempDir + util.NewShortUUID() + ".xosc"
  178. if firstTaskCache.Env == "cicv" { // cicv 或 pji
  179. err = infra.GlobalOssBucketCicv.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath)
  180. } else {
  181. err = infra.GlobalOssBucketPji.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath)
  182. }
  183. if err != nil {
  184. infra.GlobalLogger.Errorf("下载xosc文件【%v】失败,错误信息为:%v", xoscOssPath, err)
  185. continue
  186. }
  187. s1, s2, s3, s4, xodrPath, osgbPath := util.ParseXosc(xoscLocalPath)
  188. firstTaskCache.Task.Scenario.ScenarioOdr = xodrPath
  189. firstTaskCache.Task.Scenario.ScenarioOsgb = osgbPath
  190. // ------- 修改OGT传感器显示目标物框 -------
  191. for i := range firstTaskCache.Task.Vehicle.Sensors.OGT {
  192. firstTaskCache.Task.Vehicle.Sensors.OGT[i].SensorDisplay = true
  193. }
  194. // --------------- 发送 kafka 消息(获取偏移量和分区) ---------------
  195. // 获取任务消息转json
  196. firstTaskCache.Task.Vehicle.Sensors.Camera = global.DefaultCameras
  197. taskJson, err := project.TaskToJson(firstTaskCache.Task)
  198. if err != nil {
  199. infra.GlobalLogger.Error(err)
  200. continue
  201. }
  202. // ------- 发送 -------
  203. topic := projectId
  204. // 创建一个Message,并指定分区为0
  205. msg := &kafka.Message{
  206. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: infra.ApplicationYaml.Kafka.Partition, Offset: kafka.Offset(offset)},
  207. Value: []byte(taskJson),
  208. }
  209. // 发送消息,并处理结果
  210. err = infra.GlobalKafkaProducer.Produce(msg, nil)
  211. if err != nil {
  212. infra.GlobalLogger.Infof("发送任务消息 %v 失败,错误信息为: %v", msg, err)
  213. continue
  214. }
  215. infra.GlobalLogger.Infof("发送任务消息成功,话题为【%v】,偏移量为【%v】,消息为【%v】", topic, offset, taskJson)
  216. // 如果新算法需要导入
  217. if !algorithmExist {
  218. // 导入算法
  219. infra.GlobalLogger.Infof("导入算法文件【%v】到docker镜像【%v】。", algorithmTarPath, algorithmImageNameWithVersion)
  220. _, s, err := util.Execute("docker", "import", algorithmTarPath, algorithmImageNameWithVersion)
  221. infra.GlobalLogger.Infof("推送算法镜像【%v】。", algorithmImageNameWithVersion)
  222. _, s, err = util.Execute("docker", "push", algorithmImageNameWithVersion)
  223. if err != nil {
  224. infra.GlobalLogger.Errorf("导入算法镜像 %v 为 %v 失败,执行结果为:%v,错误信息为:%v", algorithmTarPath, algorithmImageNameWithVersion, s, err)
  225. time.Sleep(time.Duration(2) * time.Second)
  226. continue
  227. }
  228. infra.GlobalLogger.Infof("导入算法镜像 %v 为 %v 成功,执行结果为:%v", algorithmTarPath, algorithmImageNameWithVersion, s)
  229. err = util.RemoveFile(algorithmTarPath)
  230. if err != nil {
  231. infra.GlobalLogger.Errorf("删除算法镜像文件 %v 失败,错误信息为:%v", algorithmTarPath, err)
  232. }
  233. }
  234. // --------------- 启动 k8s pod ---------------
  235. podName := "project-" + projectId + "-" + util.NewShortUUID()
  236. namespaceName := infra.ApplicationYaml.K8s.NamespaceName
  237. nodeName := gpuNode.Hostname
  238. restParallelism := gpuNode.Parallelism
  239. vtdContainer := "vtd-" + projectId
  240. algorithmContainer := "algorithm-" + projectId
  241. vtdImage := infra.ApplicationYaml.K8s.VtdImage
  242. // 2 生成模板文件名称
  243. podYaml := nodeName + "#" + podName + ".yaml"
  244. // 3 模板yaml存储路径
  245. yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml
  246. // 4 模板yaml备份路径
  247. yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml
  248. fmt.Println(yamlPath, yamlPathBak)
  249. // 5
  250. podString := ""
  251. if firstTaskCache.EquipmentType == "JIN_LONG_BA_SHI" || firstTaskCache.EquipmentType == "PU_JIN_DUO_GONG_NENG_CHE" { // 多功能车仿真
  252. if podString, err = util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYamlPjisuv); err != nil {
  253. infra.GlobalLogger.Error(err)
  254. continue
  255. }
  256. podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommandPjisuv, -1)
  257. } else {
  258. if podString, err = util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYamlPjibot); err != nil {
  259. infra.GlobalLogger.Error(err)
  260. continue
  261. }
  262. podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommandPjibot, -1)
  263. }
  264. if firstTaskCache.Env == "cicv" {
  265. podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.OssCicv.Type, -1)
  266. podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.OssCicv.Endpoint, -1) // 不带http://前缀
  267. podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.OssCicv.AccessKeyId, -1)
  268. podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.OssCicv.AccessKeySecret, -1)
  269. podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.OssCicv.BucketName, -1)
  270. } else {
  271. podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.OssPji.Type, -1)
  272. podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.OssPji.Endpoint, -1) // 不带http://前缀
  273. podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.OssPji.AccessKeyId, -1)
  274. podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.OssPji.AccessKeySecret, -1)
  275. podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.OssPji.BucketName, -1)
  276. }
  277. podString = strings.Replace(podString, "pod-name", podName, -1)
  278. podString = strings.Replace(podString, "namespace-name", namespaceName, -1)
  279. podString = strings.Replace(podString, "node-name", nodeName, -1)
  280. podString = strings.Replace(podString, "algorithm-image", algorithmImageNameWithVersion, -1)
  281. podString = strings.Replace(podString, "vtd-container", vtdContainer, -1)
  282. podString = strings.Replace(podString, "vtd-image", vtdImage, -1)
  283. podString = strings.Replace(podString, "platform-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1)
  284. podString = strings.Replace(podString, "simulation-cloud-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1)
  285. podString = strings.Replace(podString, "platform-type", "\""+infra.ApplicationYaml.K8s.PlatformType+"\"", -1)
  286. podString = strings.Replace(podString, "kafka-ip", infra.ApplicationYaml.Kafka.Broker, -1)
  287. podString = strings.Replace(podString, "kafka-topic", projectId, -1)
  288. podString = strings.Replace(podString, "kafka-partition", "\""+util.ToString(infra.ApplicationYaml.Kafka.Partition)+"\"", -1)
  289. podString = strings.Replace(podString, "kafka-offset", "\""+util.ToString(offset)+"\"", -1)
  290. podString = strings.Replace(podString, "cpu-order", "\""+util.ToString(restParallelism-1)+"\"", -1) // cpu编号是剩余并行度-1
  291. podString = strings.Replace(podString, "algorithm-container", algorithmContainer, -1)
  292. podString = strings.Replace(podString, "start-position-x", "\""+s1+"\"", -1)
  293. podString = strings.Replace(podString, "start-position-y", "\""+s2+"\"", -1)
  294. podString = strings.Replace(podString, "end-position-x", "\""+s3+"\"", -1)
  295. podString = strings.Replace(podString, "end-position-y", "\""+s4+"\"", -1)
  296. // --------------- 保存成文件
  297. err = util.WriteFile(podString, yamlPath)
  298. err = util.WriteFile(podString, yamlPathBak)
  299. if err != nil {
  300. infra.GlobalLogger.Error("保存yaml字符串失败,错误信息为", err)
  301. continue
  302. }
  303. infra.GlobalLogger.Infof("保存yaml文件到执行路径【%v】和备份路径【%v】", yamlPath, yamlPathBak)
  304. // --------------- 启动 pod
  305. _, sr, err := util.Execute("kubectl", "apply", "-f", yamlPath)
  306. if err != nil {
  307. infra.GlobalLogger.Errorf("启动pod失败,执行结果为 %v,错误信息为 %v", s2, err)
  308. continue
  309. }
  310. infra.GlobalLogger.Errorf("启动pod成功,执行结果为 %v。", sr)
  311. // 收尾
  312. {
  313. // --------------- 添加到运行队列
  314. err = redis.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
  315. if err != nil {
  316. infra.GlobalLogger.Error(err)
  317. global.GpuNodeListMutex.Unlock()
  318. continue
  319. }
  320. // --------------- 删除镜像文件
  321. _ = util.RemoveFile(algorithmTarPath)
  322. }
  323. }
  324. }