123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- package handler
- import (
- "dcl_dispatch_server/src/package/domain"
- "dcl_dispatch_server/src/package/domain/task_cache"
- "dcl_dispatch_server/src/package/entity"
- "dcl_dispatch_server/src/package/global"
- "dcl_dispatch_server/src/package/infra"
- "dcl_dispatch_server/src/package/util"
- "github.com/gin-gonic/gin"
- "net/http"
- )
- // Confirm 返回 true(用来确认任务是否执行的,直接返回 true 即可)
- func Confirm(c *gin.Context) {
- c.String(http.StatusOK, "true")
- }
- // Tick 接收心跳,此处没有其他处理
- func Tick(c *gin.Context) {
- c.String(http.StatusOK, "true")
- }
- func State(c *gin.Context) {
- taskId := c.Query("taskId")
- state := c.Query("state")
- podName := c.Query("podName")
- if taskId == "" || state == "" || podName == "" {
- infra.GlobalLogger.Error("任务状态接口请求参数错误,需要GET请求和【taskId】【state】【podName】三个参数。")
- c.JSON(http.StatusBadRequest, entity.HttpResult{Status: false, Code: "1003", Message: "请求参数格式错误。"})
- return
- }
- if state != "Running" {
- err := util.DeletePod(infra.ClientSet, podName, infra.ApplicationYaml.K8s.NamespaceName)
- if err != nil {
- infra.GlobalLogger.Errorf("删除pod【%v】失败", podName)
- }
- // 根据taskId删除缓存中的pod元素
- runningTaskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueRunningCluster, 0, -1).Result()
- if err != nil {
- infra.GlobalLogger.Errorf("获取运行中的任务队列【%v】失败", global.KeyTaskQueueRunningCluster)
- }
- for _, json := range runningTaskCacheJsons {
- cache, err := task_cache.JsonToTaskCache(json)
- if err != nil {
- infra.GlobalLogger.Error(err)
- }
- if cache.Task.Info.TaskId == taskId {
- // 1 获取任务id关联的节点
- nodeName, _ := infra.GlobalRedisClient.Get(global.KeyTaskToNode + ":" + taskId).Result()
- // 2 归还并行度
- global.ParallelismMutex.Lock()
- gpuNodeJsons, _ := infra.GlobalRedisClient.LRange(global.KeyGpuNodeList, 0, -1).Result()
- for i, gpuNodeJson := range gpuNodeJsons {
- node, _ := domain.JsonToGpuNode(gpuNodeJson)
- if node.Hostname == nodeName {
- node.Parallelism++
- nodeJson, _ := domain.GpuNodeToJson(node)
- _, _ = infra.GlobalRedisClient.LSet(global.KeyGpuNodeList, int64(i), nodeJson).Result()
- }
- }
- global.ParallelismMutex.Unlock()
- // 3 删除任务队列中的元素
- _, _ = infra.GlobalRedisClient.LRem(global.KeyTaskQueueRunningCluster, 0, json).Result()
- }
- }
- }
- // 发送http请求到索为系统
- /*
- 仿真测试任务回调接口:
- 1、接口url
- http://1.202.169.139:8081/project/task/callback
- 2、请求方式
- POST
- 3、请求体
- {
- "taskId":1764907332423110657,
- "state":"RUNNING"
- }
- 4、响应结果
- {
- "data":true/false,
- "success": true,
- "message": "ok",
- "code": 1,
- "nowTime": "2024-05-10 15:57:42"
- }
- */
- _, err := util.PostJsonResponseJson(
- //"http://1.202.169.139:8081/project/task/callback",
- //"http://10.14.86.127:9081/project/task/callback",
- infra.ApplicationYaml.K8s.CallbackUri,
- map[string]string{
- "taskId": taskId,
- "state": state,
- },
- )
- if err != nil {
- infra.GlobalLogger.Error(err)
- c.String(http.StatusOK, "false")
- }
- c.String(http.StatusOK, "true")
- }
|