LingxinMeng 10 ヶ月 前
コミット
75caa9740f
1 ファイル変更42 行追加41 行削除
  1. 42 41
      src/package/handler/start_project.go

+ 42 - 41
src/package/handler/start_project.go

@@ -1,10 +1,11 @@
 package handler
 
 import (
+	"dcl_dispatch_server/src/package/domain"
 	"dcl_dispatch_server/src/package/entity"
+	"dcl_dispatch_server/src/package/global"
 	"dcl_dispatch_server/src/package/infra"
 	"github.com/gin-gonic/gin"
-	"io"
 	"net/http"
 )
 
@@ -116,49 +117,49 @@ import (
 */
 
 func StartProject(c *gin.Context) {
-	// 读取请求体
-	bodyBytes, err := io.ReadAll(c.Request.Body)
-	if err != nil {
-		c.String(500, "Error reading request body")
+	//// 读取请求体
+	//bodyBytes, err := io.ReadAll(c.Request.Body)
+	//if err != nil {
+	//	c.String(500, "Error reading request body")
+	//	return
+	//}
+	//// 将请求体转换为字符串
+	//bodyString := string(bodyBytes)
+	//infra.GlobalLogger.Info("项目启动接收请求参数为:", bodyString)
+
+	projectStartParam := new(entity.Project)
+	if err := c.ShouldBindJSON(&projectStartParam); err != nil {
+		infra.GlobalLogger.Error("项目启动接收请求参数报错:", err)
+		c.JSON(http.StatusBadRequest, entity.HttpResult{Status: false, Code: "1003", Message: "请求参数格式错误。"})
 		return
 	}
-	// 将请求体转换为字符串
-	bodyString := string(bodyBytes)
-	infra.GlobalLogger.Info("项目启动接收请求参数为:", bodyString)
+	// ------------ 维护一个运行任务队列,绑定用户id和节点名称,供下面两个判断同时使用(使用redis的队列)
+	userId := projectStartParam.UserId               // 用户ID
+	taskReceived := projectStartParam.Tasks          // 接收到的所有任务
+	userParallelism := projectStartParam.Parallelism // 用户的并行度上限
+	algorithmObjectKey := projectStartParam.AlgorithmObjectKey
 
-	//projectStartParam := new(entity.Project)
-	//if err := c.ShouldBindJSON(&projectStartParam); err != nil {
-	//	infra.GlobalLogger.Error("项目启动接收请求参数报错:", err)
-	//	c.JSON(http.StatusBadRequest, entity.HttpResult{Status: false, Code: "1003", Message: "请求参数格式错误。"})
-	//	return
-	//}
-	//// ------------ 维护一个运行任务队列,绑定用户id和节点名称,供下面两个判断同时使用(使用redis的队列)
-	//userId := projectStartParam.UserId               // 用户ID
-	//taskReceived := projectStartParam.Tasks          // 接收到的所有任务
-	//userParallelism := projectStartParam.Parallelism // 用户的并行度上限
-	//algorithmObjectKey := projectStartParam.AlgorithmObjectKey
-	//
-	//// 1 判断用户并行度
-	//for _, task := range taskReceived {
-	//	global.RunTaskMutex.Lock()
-	//	// 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
-	//	if domain.CanRunUser(userId, userParallelism) { // 可以运行
-	//		err := domain.AddWaitingCluster(userId, userParallelism, algorithmObjectKey, task)
-	//		if err != nil {
-	//			infra.GlobalLogger.Errorf("将任务 %v 添加到【集群等待队列】失败,错误信息为:%v", task, err)
-	//			continue
-	//		}
-	//		infra.GlobalLogger.Infof("将任务 %v 添加到【集群等待队列】成功。", task.Info.TaskId)
-	//	} else { // 不能运行
-	//		err := domain.AddWaitingUser(userId, userParallelism, algorithmObjectKey, task)
-	//		if err != nil {
-	//			infra.GlobalLogger.Errorf("将任务 %v 添加到【用户等待队列】失败,错误信息为:%v", task, err)
-	//			continue
-	//		}
-	//		infra.GlobalLogger.Infof("将任务 %v 添加到【用户等待队列】成功。", task.Info.TaskId)
-	//	}
-	//	global.RunTaskMutex.Unlock()
-	//}
+	// 1 判断用户并行度
+	for _, task := range taskReceived {
+		global.RunTaskMutex.Lock()
+		// 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
+		if domain.CanRunUser(userId, userParallelism) { // 可以运行
+			err := domain.AddWaitingCluster(userId, userParallelism, algorithmObjectKey, task)
+			if err != nil {
+				infra.GlobalLogger.Errorf("将任务 %v 添加到【集群等待队列】失败,错误信息为:%v", task, err)
+				continue
+			}
+			infra.GlobalLogger.Infof("将任务 %v 添加到【集群等待队列】成功。", task.Info.TaskId)
+		} else { // 不能运行
+			err := domain.AddWaitingUser(userId, userParallelism, algorithmObjectKey, task)
+			if err != nil {
+				infra.GlobalLogger.Errorf("将任务 %v 添加到【用户等待队列】失败,错误信息为:%v", task, err)
+				continue
+			}
+			infra.GlobalLogger.Infof("将任务 %v 添加到【用户等待队列】成功。", task.Info.TaskId)
+		}
+		global.RunTaskMutex.Unlock()
+	}
 
 	// 4 返回
 	c.JSON(http.StatusOK, entity.HttpResult{Status: true, Code: "2000", Message: "项目启动请求已被成功接收,等待调度处理。"})