old_interface_adapter.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package handler
  2. import (
  3. "dcl_dispatch_server/src/package/domain/task_cache"
  4. "dcl_dispatch_server/src/package/infra"
  5. "dcl_dispatch_server/src/package/infra/entity"
  6. "dcl_dispatch_server/src/package/infra/global"
  7. "dcl_dispatch_server/src/package/util"
  8. "github.com/gin-gonic/gin"
  9. "net/http"
  10. )
  11. // Confirm 返回 true(用来确认任务是否执行的,直接返回 true 即可)
  12. func Confirm(c *gin.Context) {
  13. c.String(http.StatusOK, "true")
  14. }
  15. // Tick 接收心跳,此处没有其他处理
  16. func Tick(c *gin.Context) {
  17. c.String(http.StatusOK, "true")
  18. }
  19. func State(c *gin.Context) {
  20. taskId := c.Query("taskId")
  21. state := c.Query("state")
  22. podName := c.Query("podName")
  23. if taskId == "" || state == "" || podName == "" {
  24. infra.GlobalLogger.Error("任务状态接口请求参数错误,需要GET请求和【taskId】【state】【podName】三个参数。")
  25. c.JSON(http.StatusBadRequest, entity.HttpResult{Status: false, Code: "1003", Message: "请求参数格式错误。"})
  26. return
  27. }
  28. if state != "Running" {
  29. err := util.DeletePod(infra.ClientSet, podName, infra.ApplicationYaml.K8s.NamespaceName)
  30. if err != nil {
  31. infra.GlobalLogger.Errorf("删除pod【%v】失败", podName)
  32. }
  33. // 根据taskId删除缓存中的pod元素
  34. runningTaskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueRunningCluster, 0, -1).Result()
  35. if err != nil {
  36. infra.GlobalLogger.Errorf("获取运行中的任务队列【%v】失败", global.KeyTaskQueueRunningCluster)
  37. }
  38. for _, json := range runningTaskCacheJsons {
  39. cache, err := task_cache.JsonToTaskCache(json)
  40. if err != nil {
  41. infra.GlobalLogger.Error(err)
  42. }
  43. if cache.Task.Info.TaskId == taskId {
  44. // 1 获取任务id关联的节点
  45. nodeName, _ := infra.GlobalRedisClient.Get(global.KeyTaskToNode + ":" + taskId).Result()
  46. // 2 归还并行度
  47. global.ParallelismMutex.Lock()
  48. gpuNodeJsons, _ := infra.GlobalRedisClient.LRange(global.KeyGpuNodeList, 0, -1).Result()
  49. for i, gpuNodeJson := range gpuNodeJsons {
  50. node, _ := infra.JsonToGpuNode(gpuNodeJson)
  51. if node.Hostname == nodeName {
  52. node.Parallelism++
  53. nodeJson, _ := infra.GpuNodeToJson(node)
  54. _, _ = infra.GlobalRedisClient.LSet(global.KeyGpuNodeList, int64(i), nodeJson).Result()
  55. }
  56. }
  57. global.ParallelismMutex.Unlock()
  58. // 3 删除任务队列中的元素
  59. _, _ = infra.GlobalRedisClient.LRem(global.KeyTaskQueueRunningCluster, 0, json).Result()
  60. }
  61. }
  62. }
  63. // 发送http请求到索为系统
  64. /*
  65. 仿真测试任务回调接口:
  66. 1、接口url
  67. http://1.202.169.139:8081/project/task/callback
  68. 2、请求方式
  69. POST
  70. 3、请求体
  71. {
  72. "taskId":1764907332423110657,
  73. "state":"RUNNING"
  74. }
  75. 4、响应结果
  76. {
  77. "data":true/false,
  78. "success": true,
  79. "message": "ok",
  80. "code": 1,
  81. "nowTime": "2024-05-10 15:57:42"
  82. }
  83. */
  84. _, err := util.PostJsonResponseJson(
  85. //"http://1.202.169.139:8081/project/task/callback",
  86. //"http://10.14.86.127:9081/project/task/callback",
  87. infra.ApplicationYaml.K8s.CallbackUri,
  88. map[string]string{
  89. "taskId": taskId,
  90. "state": state,
  91. },
  92. )
  93. if err != nil {
  94. infra.GlobalLogger.Error(err)
  95. c.String(http.StatusOK, "false")
  96. }
  97. c.String(http.StatusOK, "true")
  98. }