LingxinMeng 1 년 전
부모
커밋
8aed2e7b8c

+ 27 - 1
amd64/dispatch_server/package/domain/comm_with_redis.go

@@ -98,7 +98,7 @@ func AddWaitingCluster(userId string, userParallelism int64, algorithmObjectKey
 	return nil
 }
 
-func AddRunningCluster(taskCache entity.TaskCache) error {
+func AddRunningCluster(taskCache entity.TaskCache, nodeName string) error {
 	taskCacheTemp := taskCache
 	// 转 json
 	taskCacheJson, err := TaskCacheToJson(taskCacheTemp)
@@ -110,6 +110,24 @@ func AddRunningCluster(taskCache entity.TaskCache) error {
 	if err != nil {
 		return errors.New("任务缓存对象json " + taskCacheJson + " 添加到集群等待队列失败,错误信息为: " + util.ToString(err))
 	}
+
+	// 设置任务和节点映射
+	err = infra.GlobalRedisClient.Set(global.KeyTaskToNode+":"+taskCache.Task.Info.TaskId, nodeName, -1).Err()
+	if err != nil {
+		return errors.New("设置任务与节点关联键值对失败,错误信息为: " + util.ToString(err))
+	}
+	// 减少并行度
+	global.ParallelismMutex.Lock()
+	gpuNodeJsons, _ := infra.GlobalRedisClient.LRange(global.KeyGpuNodeList, 0, -1).Result()
+	for i, gpuNodeJson := range gpuNodeJsons {
+		node, _ := JsonToGpuNode(gpuNodeJson)
+		if node.Hostname == nodeName {
+			node.Parallelism--
+			nodeJson, _ := GpuNodeToJson(node)
+			_, _ = infra.GlobalRedisClient.LSet(global.KeyGpuNodeList, int64(i), nodeJson).Result()
+		}
+	}
+	global.ParallelismMutex.Unlock()
 	return nil
 }
 
@@ -132,3 +150,11 @@ func JsonToGpuNode(jsonData string) (infra.GpuNode, error) {
 	}
 	return taskCache, nil
 }
+
+func GpuNodeToJson(gpuNode infra.GpuNode) (string, error) {
+	jsonData, err := json.MarshalIndent(gpuNode, "", "    ")
+	if err != nil {
+		return "", err
+	}
+	return string(jsonData), nil
+}

+ 3 - 0
amd64/dispatch_server/package/global/redis_key.go

@@ -10,4 +10,7 @@ var (
 	KeyTaskQueueWaitingUser       = "task-queue-waiting-user"    // 用户等待队列,等待队列不需要根据userId区分开,资源先到先得
 	KeyTaskQueueRunningCluster    = "task-queue-running-cluster" // 集群运行队列
 	KeyTaskQueueWaitingCluster    = "task-queue-waiting-cluster" // 集群等待队列
+	KeyTaskToNode                 = "task-to-node"               // 集群等待队列
+
+	ParallelismMutex sync.Mutex
 )

+ 34 - 1
amd64/dispatch_server/package/handler/old_interface_adapter.go

@@ -1,7 +1,10 @@
 package handler
 
 import (
+	"cicv-data-closedloop/amd64/dispatch_server/package/domain"
+	"cicv-data-closedloop/amd64/dispatch_server/package/global"
 	"cicv-data-closedloop/amd64/dispatch_server/package/infra"
+	"cicv-data-closedloop/amd64/dispatch_server/package/service"
 	"cicv-data-closedloop/amd64/dispatch_server/package/util"
 	"github.com/gin-gonic/gin"
 	"net/http"
@@ -23,8 +26,38 @@ func State(c *gin.Context) {
 	if state != "Running" {
 		err := util.DeletePod(infra.ClientSet, podName, infra.ApplicationYaml.K8s.NamespaceName)
 		if err != nil {
-			infra.GlobalLogger.Infof("删除pod【%v】失败", podName)
+			infra.GlobalLogger.Errorf("删除pod【%v】失败", podName)
 		}
+		// 根据taskId删除缓存中的pod元素
+		runningTaskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueRunningCluster, 0, -1).Result()
+		if err != nil {
+			infra.GlobalLogger.Errorf("获取运行中的任务队列【%v】失败", global.KeyTaskQueueRunningCluster)
+		}
+		for _, json := range runningTaskCacheJsons {
+			cache, err := service.JsonToTaskCache(json)
+			if err != nil {
+				infra.GlobalLogger.Error(err)
+			}
+			if cache.Task.Info.TaskId == taskId {
+				// 1 获取任务id关联的节点
+				nodeName, _ := infra.GlobalRedisClient.Get(global.KeyTaskToNode + ":" + taskId).Result()
+				// 2 归还并行度
+				global.ParallelismMutex.Lock()
+				gpuNodeJsons, _ := infra.GlobalRedisClient.LRange(global.KeyGpuNodeList, 0, -1).Result()
+				for i, gpuNodeJson := range gpuNodeJsons {
+					node, _ := domain.JsonToGpuNode(gpuNodeJson)
+					if node.Hostname == nodeName {
+						node.Parallelism++
+						nodeJson, _ := domain.GpuNodeToJson(node)
+						_, _ = infra.GlobalRedisClient.LSet(global.KeyGpuNodeList, int64(i), nodeJson).Result()
+					}
+				}
+				global.ParallelismMutex.Unlock()
+				// 3 删除任务队列中的元素
+				_, _ = infra.GlobalRedisClient.LRem(global.KeyTaskQueueRunningCluster, 0, json).Result()
+			}
+		}
+
 	}
 
 	// 发送http请求到索为系统

+ 1 - 1
amd64/dispatch_server/package/service/run_task.go

@@ -97,7 +97,7 @@ func RunWaitingCluster() {
 				infra.GlobalLogger.Error(err)
 				continue
 			}
-			err = domain.AddRunningCluster(firstTaskCache)
+			err = domain.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
 			if err != nil {
 				infra.GlobalLogger.Error(err)
 				continue