run_task.go 15 KB


  1. package service
  2. import (
  3. "dcl_dispatch_server/src/package/domain"
  4. "dcl_dispatch_server/src/package/entity"
  5. "dcl_dispatch_server/src/package/global"
  6. "dcl_dispatch_server/src/package/infra"
  7. "dcl_dispatch_server/src/package/util"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "github.com/confluentinc/confluent-kafka-go/kafka"
  12. "path/filepath"
  13. "strconv"
  14. "strings"
  15. "time"
  16. )
  17. /*
  18. 负责处理用户等待队列中的任务
  19. 负责运行集群等待队列中的任务
  20. */
  21. // RunWaitingUser 判断用户等待队列中的任务是否可以加入到集群等待队列
  22. func RunWaitingUser() {
  23. infra.GlobalLogger.Infof("启动【用户等待队列】监控进程。")
  24. for {
  25. time.Sleep(2 * time.Second)
  26. global.RunTaskMutex.Lock()
  27. // 获取Redis列表中的值
  28. taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
  29. if err != nil {
  30. infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
  31. continue
  32. }
  33. for _, taskCacheJson := range taskCacheJsons {
  34. taskCache, err := JsonToTaskCache(taskCacheJson)
  35. if err != nil {
  36. infra.GlobalLogger.Error(err)
  37. continue
  38. }
  39. userId := taskCache.UserId
  40. userParallelism := taskCache.UserParallelism
  41. algorithmObjectKey := taskCache.AlgorithmObjectKey
  42. equipmentType := taskCache.EquipmentType
  43. task := taskCache.Task
  44. // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
  45. if domain.CanRunUser(userId, userParallelism) { // 可以运行
  46. err = domain.AddWaitingCluster(&entity.Project{
  47. UserId: userId,
  48. Parallelism: userParallelism,
  49. AlgorithmObjectKey: algorithmObjectKey,
  50. EquipmentType: equipmentType,
  51. }, task)
  52. if err != nil {
  53. infra.GlobalLogger.Error(err)
  54. continue
  55. }
  56. err = domain.DeleteWaitingUser(task.Info.TaskId)
  57. if err != nil {
  58. infra.GlobalLogger.Error(err)
  59. continue
  60. }
  61. }
  62. }
  63. global.RunTaskMutex.Unlock()
  64. }
  65. }
  66. // RunWaitingCluster 集群等待队列中的任务判断是否可以加入集群运行队列
  67. func RunWaitingCluster() {
  68. infra.GlobalLogger.Infof("启动【集群等待队列】监控进程。")
  69. for {
  70. time.Sleep(2 * time.Second)
  71. global.GpuNodeListMutex.Lock()
  72. // 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
  73. can, gpuNode, err := domain.CanRunCluster()
  74. if err != nil {
  75. infra.GlobalLogger.Error(err)
  76. global.GpuNodeListMutex.Unlock()
  77. continue
  78. }
  79. firstTaskCache := entity.TaskCache{}
  80. algorithmTarName := ""
  81. algorithmTarPath := ""
  82. algorithmImageName := ""
  83. newAlgorithm := false
  84. if can {
  85. //infra.GlobalLogger.Infof("节点 %v 有剩余并行度。", gpuNode)
  86. // 判断是否有待运行的任务
  87. waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result()
  88. if waitingClusterNumber == 0 {
  89. //infra.GlobalLogger.Info("集群没有等待运行的任务。")
  90. global.GpuNodeListMutex.Unlock()
  91. continue
  92. } else {
  93. infra.GlobalLogger.Infof("集群存在 %v 个等待运行的任务。", waitingClusterNumber)
  94. }
  95. // 取出并移出,20241017 防止报错后陷入死循环
  96. {
  97. firstTaskCacheJson, err := infra.GlobalRedisClient.LIndex(global.KeyTaskQueueWaitingCluster, 0).Result()
  98. if err != nil {
  99. infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
  100. global.GpuNodeListMutex.Unlock()
  101. continue
  102. }
  103. firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
  104. if err != nil {
  105. infra.GlobalLogger.Error(err)
  106. global.GpuNodeListMutex.Unlock()
  107. continue
  108. }
  109. // --------------- 从等待队列中移除
  110. if _, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result(); err != nil {
  111. infra.GlobalLogger.Error(err)
  112. continue
  113. }
  114. }
  115. // --------------- 下载算法 --------------- todo 算法这里需要控制已经下载过的算法就不要再次下载了
  116. {
  117. infra.GlobalLogger.Infof("开始下载算法 %v。", firstTaskCache.AlgorithmObjectKey)
  118. algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey)
  119. algorithmTarPath = infra.ApplicationYaml.K8s.AlgorithmTarTempDir + util.NewShortUUID() + "/" + algorithmTarName
  120. _ = util.CreateParentDir(algorithmTarPath)
  121. algorithmImageName = infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
  122. newAlgorithm = util.ImageExists(infra.GlobalDockerClient, algorithmImageName)
  123. if newAlgorithm {
  124. if firstTaskCache.EquipmentType == "JIN_LONG_BA_SHI" || firstTaskCache.EquipmentType == "PU_JIN_DUO_GONG_NENG_CHE" {
  125. err = infra.GlobalOssBucketCicv.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
  126. } else {
  127. err = infra.GlobalOssBucketPji.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
  128. }
  129. if err != nil {
  130. infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
  131. time.Sleep(time.Duration(2) * time.Second)
  132. global.GpuNodeListMutex.Unlock()
  133. continue
  134. }
  135. infra.GlobalLogger.Infof("下载算法 %v 成功。", firstTaskCache.AlgorithmObjectKey)
  136. } else {
  137. infra.GlobalLogger.Infof("算法 %v 已存在。", algorithmImageName)
  138. }
  139. }
  140. } else {
  141. infra.GlobalLogger.Infof("集群没有剩余并行度。")
  142. global.GpuNodeListMutex.Unlock()
  143. continue
  144. }
  145. global.GpuNodeListMutex.Unlock()
  146. // 获取项目ID
  147. projectId := firstTaskCache.Task.Info.ProjectId
  148. offsetKey := "offset:" + projectId
  149. offset := 0
  150. // 根据项目ID获取偏移量
  151. val, err := infra.GlobalRedisClient.Get(offsetKey).Result()
  152. if err != nil {
  153. infra.GlobalLogger.Infof("偏移量键 %v 不存在,初始化设置为 0。", offsetKey)
  154. err = infra.GlobalRedisClient.Set(offsetKey, 0, 0).Err()
  155. if err != nil {
  156. infra.GlobalLogger.Infof("偏移量键值对 %v 初始化失败,错误信息为: %v", offsetKey, err)
  157. continue
  158. }
  159. } else {
  160. offset, err = strconv.Atoi(val)
  161. if err != nil {
  162. infra.GlobalLogger.Infof("字符串 %v 转整数失败,错误信息为: %v", val, err)
  163. continue
  164. }
  165. infra.GlobalLogger.Infof("当前任务使用偏移量【%v】", offset)
  166. }
  167. // 取出偏移量后将缓存中的加一,给下个任务使用。
  168. _, err = infra.GlobalRedisClient.Incr(offsetKey).Result()
  169. if err != nil {
  170. infra.GlobalLogger.Infof("偏移量 %v 加一失败,错误信息为: %v", offsetKey, err)
  171. continue
  172. }
  173. infra.GlobalLogger.Infof("偏移量【%v】加一给下个任务使用。", offsetKey)
  174. // --------------- 发送 kafka 消息(获取偏移量和分区) ---------------
  175. // 获取任务消息转json
  176. // 将摄像头写死一个参数
  177. firstTaskCache.Task.Vehicle.Sensors.Camera = []entity.SensorCamera{
  178. {
  179. Sensor: entity.Sensor{
  180. SensorName: "custom_camera",
  181. SensorNear: 1.0,
  182. SensorFar: 1500.0,
  183. SensorX: -15000.0,
  184. SensorY: 0.0,
  185. SensorZ: 5000.0,
  186. SensorH: 0.0,
  187. SensorP: 10.0,
  188. SensorR: 0.0,
  189. },
  190. SensorForH: 45,
  191. SensorForV: 27,
  192. SensorResolution: "480*270",
  193. SensorFrameRate: 25,
  194. },
  195. }
  196. taskJson, err := TaskToJson(firstTaskCache.Task)
  197. if err != nil {
  198. infra.GlobalLogger.Error(err)
  199. continue
  200. }
  201. topic := projectId
  202. // 创建一个Message,并指定分区为0
  203. msg := &kafka.Message{
  204. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: infra.ApplicationYaml.Kafka.Partition, Offset: kafka.Offset(offset)},
  205. Value: []byte(taskJson),
  206. }
  207. // 发送消息,并处理结果
  208. err = infra.GlobalKafkaProducer.Produce(msg, nil)
  209. if err != nil {
  210. infra.GlobalLogger.Infof("发送任务消息 %v 失败,错误信息为: %v", msg, err)
  211. continue
  212. }
  213. infra.GlobalLogger.Infof("发送任务消息成功,话题为【%v】,偏移量为【%v】。", topic, offset)
  214. // 如果新算法需要导入
  215. if newAlgorithm {
  216. // 导入算法
  217. _, s, err := util.Execute("docker", "import", algorithmTarPath, algorithmImageName)
  218. _, s, err = util.Execute("docker", "push", algorithmImageName)
  219. if err != nil {
  220. infra.GlobalLogger.Errorf("导入算法镜像 %v 为 %v 失败,执行结果为:%v,错误信息为:%v", algorithmTarPath, algorithmImageName, s, err)
  221. time.Sleep(time.Duration(2) * time.Second)
  222. continue
  223. }
  224. infra.GlobalLogger.Infof("导入算法镜像 %v 为 %v 成功,执行结果为:%v", algorithmTarPath, algorithmImageName, s)
  225. err = util.RemoveFile(algorithmTarPath)
  226. if err != nil {
  227. infra.GlobalLogger.Errorf("删除算法镜像文件 %v 失败,错误信息为:%v", algorithmTarPath, err)
  228. }
  229. }
  230. // --------------- 启动 k8s pod ---------------
  231. podName := "project-" + projectId + "-" + util.NewShortUUID()
  232. namespaceName := infra.ApplicationYaml.K8s.NamespaceName
  233. nodeName := gpuNode.Hostname
  234. restParallelism := gpuNode.Parallelism
  235. vtdContainer := "vtd-" + projectId
  236. algorithmContainer := "algorithm-" + projectId
  237. vtdImage := infra.ApplicationYaml.K8s.VtdImage
  238. // 2 生成模板文件名称
  239. podYaml := nodeName + "#" + podName + ".yaml"
  240. // 3 模板yaml存储路径
  241. yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml
  242. // 4 模板yaml备份路径
  243. yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml
  244. fmt.Println(yamlPath, yamlPathBak)
  245. // 5
  246. podString := ""
  247. if firstTaskCache.EquipmentType == "JIN_LONG_BA_SHI" || firstTaskCache.EquipmentType == "PU_JIN_DUO_GONG_NENG_CHE" { // 多功能车仿真
  248. if podString, err = util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYamlPjisuv); err != nil {
  249. infra.GlobalLogger.Error(err)
  250. continue
  251. }
  252. podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommandPjisuv, -1)
  253. podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.OssCicv.Type, -1)
  254. podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.OssCicv.Endpoint, -1) // 不带http://前缀
  255. podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.OssCicv.AccessKeyId, -1)
  256. podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.OssCicv.AccessKeySecret, -1)
  257. podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.OssCicv.BucketName, -1)
  258. } else {
  259. if podString, err = util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYamlPjibot); err != nil {
  260. infra.GlobalLogger.Error(err)
  261. continue
  262. }
  263. podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommandPjibot, -1)
  264. podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.OssPji.Type, -1)
  265. podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.OssPji.Endpoint, -1) // 不带http://前缀
  266. podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.OssPji.AccessKeyId, -1)
  267. podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.OssPji.AccessKeySecret, -1)
  268. podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.OssPji.BucketName, -1)
  269. }
  270. podString = strings.Replace(podString, "pod-name", podName, -1)
  271. podString = strings.Replace(podString, "namespace-name", namespaceName, -1)
  272. podString = strings.Replace(podString, "node-name", nodeName, -1)
  273. podString = strings.Replace(podString, "algorithm-image", algorithmImageName, -1)
  274. podString = strings.Replace(podString, "vtd-container", vtdContainer, -1)
  275. podString = strings.Replace(podString, "vtd-image", vtdImage, -1)
  276. podString = strings.Replace(podString, "platform-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1)
  277. podString = strings.Replace(podString, "simulation-cloud-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1)
  278. podString = strings.Replace(podString, "platform-type", "\""+infra.ApplicationYaml.K8s.PlatformType+"\"", -1)
  279. podString = strings.Replace(podString, "kafka-ip", infra.ApplicationYaml.Kafka.Broker, -1)
  280. podString = strings.Replace(podString, "kafka-topic", projectId, -1)
  281. podString = strings.Replace(podString, "kafka-partition", "\""+util.ToString(infra.ApplicationYaml.Kafka.Partition)+"\"", -1)
  282. podString = strings.Replace(podString, "kafka-offset", "\""+util.ToString(offset)+"\"", -1)
  283. podString = strings.Replace(podString, "cpu-order", "\""+util.ToString(restParallelism-1)+"\"", -1) // cpu编号是剩余并行度-1
  284. podString = strings.Replace(podString, "algorithm-container", algorithmContainer, -1)
  285. // 从xosc中解析起终点位置
  286. xoscOssPath := firstTaskCache.Task.Scenario.ScenarioOsc
  287. tempDir := "/mnt/disk001/dcl_dispatch_server/temp/"
  288. util.CreateDir(tempDir)
  289. xoscLocalPath := tempDir + util.NewShortUUID() + ".xosc"
  290. if firstTaskCache.EquipmentType == "JIN_LONG_BA_SHI" || firstTaskCache.EquipmentType == "PU_JIN_DUO_GONG_NENG_CHE" {
  291. err = infra.GlobalOssBucketCicv.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath)
  292. } else {
  293. err = infra.GlobalOssBucketPji.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath)
  294. }
  295. if err != nil {
  296. infra.GlobalLogger.Errorf("下载xosc文件【%v】失败,错误信息为:%v", xoscOssPath, err)
  297. continue
  298. }
  299. {
  300. s1, s2, s3, s4 := util.GetStartAndEnd(xoscLocalPath)
  301. podString = strings.Replace(podString, "start-position-x", "\""+s1+"\"", -1)
  302. podString = strings.Replace(podString, "start-position-y", "\""+s2+"\"", -1)
  303. podString = strings.Replace(podString, "end-position-x", "\""+s3+"\"", -1)
  304. podString = strings.Replace(podString, "end-position-y", "\""+s4+"\"", -1)
  305. }
  306. // --------------- 保存成文件
  307. err = util.WriteFile(podString, yamlPath)
  308. err = util.WriteFile(podString, yamlPathBak)
  309. if err != nil {
  310. infra.GlobalLogger.Error("保存yaml字符串失败,错误信息为", err)
  311. continue
  312. }
  313. infra.GlobalLogger.Infof("保存yaml文件到执行路径【%v】和备份路径【%v】", yamlPath, yamlPathBak)
  314. // --------------- 启动 pod
  315. _, s2, err := util.Execute("kubectl", "apply", "-f", yamlPath)
  316. if err != nil {
  317. infra.GlobalLogger.Errorf("启动pod失败,执行结果为 %v,错误信息为 %v", s2, err)
  318. continue
  319. }
  320. infra.GlobalLogger.Errorf("启动pod成功,执行结果为 %v。", s2)
  321. // 收尾
  322. {
  323. // --------------- 添加到运行队列
  324. err = domain.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
  325. if err != nil {
  326. infra.GlobalLogger.Error(err)
  327. global.GpuNodeListMutex.Unlock()
  328. continue
  329. }
  330. // --------------- 删除镜像文件
  331. _ = util.RemoveFile(algorithmTarPath)
  332. }
  333. }
  334. }
  335. func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
  336. // 创建一个 Person 类型的变量
  337. var taskCache entity.TaskCache
  338. // 使用 json.Unmarshal 解析 JSON 字符串到结构体
  339. err := json.Unmarshal([]byte(jsonData), &taskCache)
  340. if err != nil {
  341. return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
  342. }
  343. return taskCache, nil
  344. }
  345. func TaskToJson(task entity.Task) (string, error) {
  346. jsonData, err := json.Marshal(task)
  347. if err != nil {
  348. return "", errors.New("转json失败,错误信息为:" + err.Error())
  349. }
  350. return string(jsonData), nil
  351. }