LingxinMeng hai 6 meses
pai
achega
febfecf041

+ 2 - 3
src/package/handler/old_interface_adapter.go

@@ -1,7 +1,6 @@
 package handler
 
 import (
-	"dcl_dispatch_server/src/package/domain"
 	"dcl_dispatch_server/src/package/domain/task_cache"
 	"dcl_dispatch_server/src/package/entity"
 	"dcl_dispatch_server/src/package/global"
@@ -52,10 +51,10 @@ func State(c *gin.Context) {
 				global.ParallelismMutex.Lock()
 				gpuNodeJsons, _ := infra.GlobalRedisClient.LRange(global.KeyGpuNodeList, 0, -1).Result()
 				for i, gpuNodeJson := range gpuNodeJsons {
-					node, _ := domain.JsonToGpuNode(gpuNodeJson)
+					node, _ := infra.JsonToGpuNode(gpuNodeJson)
 					if node.Hostname == nodeName {
 						node.Parallelism++
-						nodeJson, _ := domain.GpuNodeToJson(node)
+						nodeJson, _ := infra.GpuNodeToJson(node)
 						_, _ = infra.GlobalRedisClient.LSet(global.KeyGpuNodeList, int64(i), nodeJson).Result()
 					}
 				}

+ 4 - 4
src/package/handler/start_project.go

@@ -2,11 +2,11 @@ package handler
 
 import (
 	"bytes"
-	"dcl_dispatch_server/src/package/domain"
 	"dcl_dispatch_server/src/package/domain/project"
 	"dcl_dispatch_server/src/package/entity"
 	"dcl_dispatch_server/src/package/global"
 	"dcl_dispatch_server/src/package/infra"
+	"dcl_dispatch_server/src/package/infra/redis"
 	"github.com/gin-gonic/gin"
 	"io"
 	"net/http"
@@ -150,15 +150,15 @@ func StartProject(c *gin.Context) {
 	for _, task := range taskReceived {
 		global.RunTaskMutex.Lock()
 		// 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
-		if domain.CanRunUser(userId, userParallelism) { // 可以运行
-			err := domain.AddWaitingCluster(projectStartParam, task)
+		if redis.CanRunUser(userId, userParallelism) { // 可以运行
+			err := redis.AddWaitingCluster(projectStartParam, task)
 			if err != nil {
 				infra.GlobalLogger.Errorf("将任务 %v 添加到【集群等待队列】失败,错误信息为:%v", task, err)
 				continue
 			}
 			infra.GlobalLogger.Infof("将任务 %v 添加到【集群等待队列】成功。", task.Info.TaskId)
 		} else { // 不能运行
-			err := domain.AddWaitingUser(projectStartParam, task)
+			err := redis.AddWaitingUser(projectStartParam, task)
 			if err != nil {
 				infra.GlobalLogger.Errorf("将任务 %v 添加到【用户等待队列】失败,错误信息为:%v", task, err)
 				continue

+ 21 - 0
src/package/infra/i_application.go

@@ -3,6 +3,8 @@ package infra
 import (
 	"dcl_dispatch_server/src/package/util"
 	_ "embed"
+	"encoding/json"
+	"errors"
 	"fmt"
 	"gopkg.in/yaml.v2"
 )
@@ -88,3 +90,22 @@ func InitApplication() {
 	// 创建镜像下载目录
 	util.CreateDir(ApplicationYaml.K8s.AlgorithmTarTempDir)
 }
+func JsonToGpuNode(jsonData string) (GpuNode, error) {
+	// 创建一个 Person 类型的变量
+	var taskCache GpuNode
+
+	// 使用 json.Unmarshal 解析 JSON 字符串到结构体
+	err := json.Unmarshal([]byte(jsonData), &taskCache)
+	if err != nil {
+		return GpuNode{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
+	}
+	return taskCache, nil
+}
+
+func GpuNodeToJson(gpuNode GpuNode) (string, error) {
+	jsonData, err := json.MarshalIndent(gpuNode, "", "    ")
+	if err != nil {
+		return "", err
+	}
+	return string(jsonData), nil
+}

+ 4 - 26
src/package/domain/comm_with_redis.go → src/package/infra/redis/comm_with_redis.go

@@ -1,4 +1,4 @@
-package domain
+package redis
 
 import (
 	"dcl_dispatch_server/src/package/domain/project"
@@ -6,9 +6,7 @@ import (
 	"dcl_dispatch_server/src/package/global"
 	"dcl_dispatch_server/src/package/infra"
 	"dcl_dispatch_server/src/package/util"
-	"encoding/json"
 	"errors"
-	"fmt"
 )
 
 func CanRunUser(userId string, userParallelism int64) bool {
@@ -33,7 +31,7 @@ func CanRunCluster() (bool, infra.GpuNode, error) {
 	var can bool
 	maxParallelism = 0
 	for _, gpuNodeJson := range gpuNodeJsons {
-		node, err := JsonToGpuNode(gpuNodeJson)
+		node, err := infra.JsonToGpuNode(gpuNodeJson)
 		if err != nil {
 			return false, infra.GpuNode{}, err
 		}
@@ -124,10 +122,10 @@ func AddRunningCluster(taskCache task_cache.TaskCache, nodeName string) error {
 	global.ParallelismMutex.Lock()
 	gpuNodeJsons, _ := infra.GlobalRedisClient.LRange(global.KeyGpuNodeList, 0, -1).Result()
 	for i, gpuNodeJson := range gpuNodeJsons {
-		node, _ := JsonToGpuNode(gpuNodeJson)
+		node, _ := infra.JsonToGpuNode(gpuNodeJson)
 		if node.Hostname == nodeName {
 			node.Parallelism--
-			nodeJson, _ := GpuNodeToJson(node)
+			nodeJson, _ := infra.GpuNodeToJson(node)
 			_, _ = infra.GlobalRedisClient.LSet(global.KeyGpuNodeList, int64(i), nodeJson).Result()
 			infra.GlobalLogger.Infof("节点 %v 并行度减一后剩余并行度为 %v。", node.Hostname, node.Parallelism)
 		}
@@ -135,23 +133,3 @@ func AddRunningCluster(taskCache task_cache.TaskCache, nodeName string) error {
 	global.ParallelismMutex.Unlock()
 	return nil
 }
-
-func JsonToGpuNode(jsonData string) (infra.GpuNode, error) {
-	// 创建一个 Person 类型的变量
-	var taskCache infra.GpuNode
-
-	// 使用 json.Unmarshal 解析 JSON 字符串到结构体
-	err := json.Unmarshal([]byte(jsonData), &taskCache)
-	if err != nil {
-		return infra.GpuNode{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
-	}
-	return taskCache, nil
-}
-
-func GpuNodeToJson(gpuNode infra.GpuNode) (string, error) {
-	jsonData, err := json.MarshalIndent(gpuNode, "", "    ")
-	if err != nil {
-		return "", err
-	}
-	return string(jsonData), nil
-}

+ 6 - 6
src/package/service/run_task.go

@@ -1,11 +1,11 @@
 package service
 
 import (
-	"dcl_dispatch_server/src/package/domain"
 	"dcl_dispatch_server/src/package/domain/project"
 	"dcl_dispatch_server/src/package/domain/task_cache"
 	"dcl_dispatch_server/src/package/global"
 	"dcl_dispatch_server/src/package/infra"
+	"dcl_dispatch_server/src/package/infra/redis"
 	"dcl_dispatch_server/src/package/util"
 	"fmt"
 	"path/filepath"
@@ -46,8 +46,8 @@ func RunWaitingUser() {
 			equipmentType := taskCache.EquipmentType
 			task := taskCache.Task
 			// 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
-			if domain.CanRunUser(userId, userParallelism) { // 可以运行
-				err = domain.AddWaitingCluster(&project.Project{
+			if redis.CanRunUser(userId, userParallelism) { // 可以运行
+				err = redis.AddWaitingCluster(&project.Project{
 					UserId:             userId,
 					Parallelism:        userParallelism,
 					AlgorithmObjectKey: algorithmObjectKey,
@@ -57,7 +57,7 @@ func RunWaitingUser() {
 					infra.GlobalLogger.Error(err)
 					continue
 				}
-				err = domain.DeleteWaitingUser(task.Info.TaskId)
+				err = redis.DeleteWaitingUser(task.Info.TaskId)
 				if err != nil {
 					infra.GlobalLogger.Error(err)
 					continue
@@ -76,7 +76,7 @@ func RunWaitingCluster() {
 		time.Sleep(2 * time.Second)
 		global.GpuNodeListMutex.Lock()
 		// 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
-		can, gpuNode, err := domain.CanRunCluster()
+		can, gpuNode, err := redis.CanRunCluster()
 		if err != nil {
 			infra.GlobalLogger.Error(err)
 			global.GpuNodeListMutex.Unlock()
@@ -326,7 +326,7 @@ func RunWaitingCluster() {
 		// 收尾
 		{
 			// --------------- 添加到运行队列
-			err = domain.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
+			err = redis.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
 			if err != nil {
 				infra.GlobalLogger.Error(err)
 				global.GpuNodeListMutex.Unlock()