old_interface_adapter.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package handler
  2. import (
  3. "dcl_dispatch_server/src/package/domain"
  4. "dcl_dispatch_server/src/package/domain/task_cache"
  5. "dcl_dispatch_server/src/package/entity"
  6. "dcl_dispatch_server/src/package/global"
  7. "dcl_dispatch_server/src/package/infra"
  8. "dcl_dispatch_server/src/package/util"
  9. "github.com/gin-gonic/gin"
  10. "net/http"
  11. )
  12. // Confirm 返回 true(用来确认任务是否执行的,直接返回 true 即可)
  13. func Confirm(c *gin.Context) {
  14. c.String(http.StatusOK, "true")
  15. }
  16. // Tick 接收心跳,此处没有其他处理
  17. func Tick(c *gin.Context) {
  18. c.String(http.StatusOK, "true")
  19. }
  20. func State(c *gin.Context) {
  21. taskId := c.Query("taskId")
  22. state := c.Query("state")
  23. podName := c.Query("podName")
  24. if taskId == "" || state == "" || podName == "" {
  25. infra.GlobalLogger.Error("任务状态接口请求参数错误,需要GET请求和【taskId】【state】【podName】三个参数。")
  26. c.JSON(http.StatusBadRequest, entity.HttpResult{Status: false, Code: "1003", Message: "请求参数格式错误。"})
  27. return
  28. }
  29. if state != "Running" {
  30. err := util.DeletePod(infra.ClientSet, podName, infra.ApplicationYaml.K8s.NamespaceName)
  31. if err != nil {
  32. infra.GlobalLogger.Errorf("删除pod【%v】失败", podName)
  33. }
  34. // 根据taskId删除缓存中的pod元素
  35. runningTaskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueRunningCluster, 0, -1).Result()
  36. if err != nil {
  37. infra.GlobalLogger.Errorf("获取运行中的任务队列【%v】失败", global.KeyTaskQueueRunningCluster)
  38. }
  39. for _, json := range runningTaskCacheJsons {
  40. cache, err := task_cache.JsonToTaskCache(json)
  41. if err != nil {
  42. infra.GlobalLogger.Error(err)
  43. }
  44. if cache.Task.Info.TaskId == taskId {
  45. // 1 获取任务id关联的节点
  46. nodeName, _ := infra.GlobalRedisClient.Get(global.KeyTaskToNode + ":" + taskId).Result()
  47. // 2 归还并行度
  48. global.ParallelismMutex.Lock()
  49. gpuNodeJsons, _ := infra.GlobalRedisClient.LRange(global.KeyGpuNodeList, 0, -1).Result()
  50. for i, gpuNodeJson := range gpuNodeJsons {
  51. node, _ := domain.JsonToGpuNode(gpuNodeJson)
  52. if node.Hostname == nodeName {
  53. node.Parallelism++
  54. nodeJson, _ := domain.GpuNodeToJson(node)
  55. _, _ = infra.GlobalRedisClient.LSet(global.KeyGpuNodeList, int64(i), nodeJson).Result()
  56. }
  57. }
  58. global.ParallelismMutex.Unlock()
  59. // 3 删除任务队列中的元素
  60. _, _ = infra.GlobalRedisClient.LRem(global.KeyTaskQueueRunningCluster, 0, json).Result()
  61. }
  62. }
  63. }
  64. // 发送http请求到索为系统
  65. /*
  66. 仿真测试任务回调接口:
  67. 1、接口url
  68. http://1.202.169.139:8081/project/task/callback
  69. 2、请求方式
  70. POST
  71. 3、请求体
  72. {
  73. "taskId":1764907332423110657,
  74. "state":"RUNNING"
  75. }
  76. 4、响应结果
  77. {
  78. "data":true/false,
  79. "success": true,
  80. "message": "ok",
  81. "code": 1,
  82. "nowTime": "2024-05-10 15:57:42"
  83. }
  84. */
  85. _, err := util.PostJsonResponseJson(
  86. //"http://1.202.169.139:8081/project/task/callback",
  87. //"http://10.14.86.127:9081/project/task/callback",
  88. infra.ApplicationYaml.K8s.CallbackUri,
  89. map[string]string{
  90. "taskId": taskId,
  91. "state": state,
  92. },
  93. )
  94. if err != nil {
  95. infra.GlobalLogger.Error(err)
  96. c.String(http.StatusOK, "false")
  97. }
  98. c.String(http.StatusOK, "true")
  99. }