package handler import ( "dcl_dispatch_server/src/package/domain/task_cache" "dcl_dispatch_server/src/package/infra" "dcl_dispatch_server/src/package/infra/entity" "dcl_dispatch_server/src/package/infra/global" "dcl_dispatch_server/src/package/util" "github.com/gin-gonic/gin" "net/http" ) // Confirm 返回 true(用来确认任务是否执行的,直接返回 true 即可) func Confirm(c *gin.Context) { c.String(http.StatusOK, "true") } // Tick 接收心跳,此处没有其他处理 func Tick(c *gin.Context) { c.String(http.StatusOK, "true") } func State(c *gin.Context) { taskId := c.Query("taskId") state := c.Query("state") podName := c.Query("podName") if taskId == "" || state == "" || podName == "" { infra.GlobalLogger.Error("任务状态接口请求参数错误,需要GET请求和【taskId】【state】【podName】三个参数。") c.JSON(http.StatusBadRequest, entity.HttpResult{Status: false, Code: "1003", Message: "请求参数格式错误。"}) return } if state != "Running" { err := util.DeletePod(infra.ClientSet, podName, infra.ApplicationYaml.K8s.NamespaceName) if err != nil { infra.GlobalLogger.Errorf("删除pod【%v】失败", podName) } // 根据taskId删除缓存中的pod元素 runningTaskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueRunningCluster, 0, -1).Result() if err != nil { infra.GlobalLogger.Errorf("获取运行中的任务队列【%v】失败", global.KeyTaskQueueRunningCluster) } for _, json := range runningTaskCacheJsons { cache, err := task_cache.JsonToTaskCache(json) if err != nil { infra.GlobalLogger.Error(err) } if cache.Task.Info.TaskId == taskId { // 1 获取任务id关联的节点 nodeName, _ := infra.GlobalRedisClient.Get(global.KeyTaskToNode + ":" + taskId).Result() // 2 归还并行度 global.ParallelismMutex.Lock() gpuNodeJsons, _ := infra.GlobalRedisClient.LRange(global.KeyGpuNodeList, 0, -1).Result() for i, gpuNodeJson := range gpuNodeJsons { node, _ := infra.JsonToGpuNode(gpuNodeJson) if node.Hostname == nodeName { node.Parallelism++ nodeJson, _ := infra.GpuNodeToJson(node) _, _ = infra.GlobalRedisClient.LSet(global.KeyGpuNodeList, int64(i), nodeJson).Result() } } global.ParallelismMutex.Unlock() // 3 删除任务队列中的元素 _, _ = infra.GlobalRedisClient.LRem(global.KeyTaskQueueRunningCluster, 0, json).Result() } } } // 发送http请求到索为系统 /* 仿真测试任务回调接口: 1、接口url http://1.202.169.139:8081/project/task/callback 2、请求方式 POST 3、请求体 { "taskId":1764907332423110657, "state":"RUNNING" } 4、响应结果 { "data":true/false, "success": true, "message": "ok", "code": 1, "nowTime": "2024-05-10 15:57:42" } */ _, err := util.PostJsonResponseJson( //"http://1.202.169.139:8081/project/task/callback", //"http://10.14.86.127:9081/project/task/callback", infra.ApplicationYaml.K8s.CallbackUri, map[string]string{ "taskId": taskId, "state": state, }, ) if err != nil { infra.GlobalLogger.Error(err) c.String(http.StatusOK, "false") } c.String(http.StatusOK, "true") }