|
@@ -1,9 +1,7 @@
|
|
package handler
|
|
package handler
|
|
|
|
|
|
import (
|
|
import (
|
|
- "dcl_dispatch_server/src/package/domain"
|
|
|
|
"dcl_dispatch_server/src/package/entity"
|
|
"dcl_dispatch_server/src/package/entity"
|
|
- "dcl_dispatch_server/src/package/global"
|
|
|
|
"dcl_dispatch_server/src/package/infra"
|
|
"dcl_dispatch_server/src/package/infra"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/gin-gonic/gin"
|
|
"io"
|
|
"io"
|
|
@@ -118,49 +116,49 @@ import (
|
|
*/
|
|
*/
|
|
|
|
|
|
func StartProject(c *gin.Context) {
|
|
func StartProject(c *gin.Context) {
|
|
-
|
|
|
|
- projectStartParam := new(entity.Project)
|
|
|
|
- if err := c.ShouldBindJSON(&projectStartParam); err != nil {
|
|
|
|
- // 读取请求体
|
|
|
|
- bodyBytes, err := io.ReadAll(c.Request.Body)
|
|
|
|
- if err != nil {
|
|
|
|
- c.String(500, "Error reading request body")
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- // 将请求体转换为字符串
|
|
|
|
- bodyString := string(bodyBytes)
|
|
|
|
- infra.GlobalLogger.Error("项目启动接收请求参数为:", bodyString)
|
|
|
|
- infra.GlobalLogger.Error("项目启动接收请求参数报错:", err)
|
|
|
|
- c.JSON(http.StatusBadRequest, entity.HttpResult{Status: false, Code: "1003", Message: "请求参数格式错误。"})
|
|
|
|
|
|
+ // 读取请求体
|
|
|
|
+ bodyBytes, err := io.ReadAll(c.Request.Body)
|
|
|
|
+ if err != nil {
|
|
|
|
+ c.String(500, "Error reading request body")
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- // ------------ 维护一个运行任务队列,绑定用户id和节点名称,供下面两个判断同时使用(使用redis的队列)
|
|
|
|
- userId := projectStartParam.UserId // 用户ID
|
|
|
|
- taskReceived := projectStartParam.Tasks // 接收到的所有任务
|
|
|
|
- userParallelism := projectStartParam.Parallelism // 用户的并行度上限
|
|
|
|
- algorithmObjectKey := projectStartParam.AlgorithmObjectKey
|
|
|
|
|
|
+ // 将请求体转换为字符串
|
|
|
|
+ bodyString := string(bodyBytes)
|
|
|
|
+ infra.GlobalLogger.Info("项目启动接收请求参数为:", bodyString)
|
|
|
|
|
|
- // 1 判断用户并行度
|
|
|
|
- for _, task := range taskReceived {
|
|
|
|
- global.RunTaskMutex.Lock()
|
|
|
|
- // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
|
|
|
|
- if domain.CanRunUser(userId, userParallelism) { // 可以运行
|
|
|
|
- err := domain.AddWaitingCluster(userId, userParallelism, algorithmObjectKey, task)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Errorf("将任务 %v 添加到【集群等待队列】失败,错误信息为:%v", task, err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- infra.GlobalLogger.Infof("将任务 %v 添加到【集群等待队列】成功。", task.Info.TaskId)
|
|
|
|
- } else { // 不能运行
|
|
|
|
- err := domain.AddWaitingUser(userId, userParallelism, algorithmObjectKey, task)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Errorf("将任务 %v 添加到【用户等待队列】失败,错误信息为:%v", task, err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- infra.GlobalLogger.Infof("将任务 %v 添加到【用户等待队列】成功。", task.Info.TaskId)
|
|
|
|
- }
|
|
|
|
- global.RunTaskMutex.Unlock()
|
|
|
|
- }
|
|
|
|
|
|
+ //projectStartParam := new(entity.Project)
|
|
|
|
+ //if err := c.ShouldBindJSON(&projectStartParam); err != nil {
|
|
|
|
+ // infra.GlobalLogger.Error("项目启动接收请求参数报错:", err)
|
|
|
|
+ // c.JSON(http.StatusBadRequest, entity.HttpResult{Status: false, Code: "1003", Message: "请求参数格式错误。"})
|
|
|
|
+ // return
|
|
|
|
+ //}
|
|
|
|
+ //// ------------ 维护一个运行任务队列,绑定用户id和节点名称,供下面两个判断同时使用(使用redis的队列)
|
|
|
|
+ //userId := projectStartParam.UserId // 用户ID
|
|
|
|
+ //taskReceived := projectStartParam.Tasks // 接收到的所有任务
|
|
|
|
+ //userParallelism := projectStartParam.Parallelism // 用户的并行度上限
|
|
|
|
+ //algorithmObjectKey := projectStartParam.AlgorithmObjectKey
|
|
|
|
+ //
|
|
|
|
+ //// 1 判断用户并行度
|
|
|
|
+ //for _, task := range taskReceived {
|
|
|
|
+ // global.RunTaskMutex.Lock()
|
|
|
|
+ // // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
|
|
|
|
+ // if domain.CanRunUser(userId, userParallelism) { // 可以运行
|
|
|
|
+ // err := domain.AddWaitingCluster(userId, userParallelism, algorithmObjectKey, task)
|
|
|
|
+ // if err != nil {
|
|
|
|
+ // infra.GlobalLogger.Errorf("将任务 %v 添加到【集群等待队列】失败,错误信息为:%v", task, err)
|
|
|
|
+ // continue
|
|
|
|
+ // }
|
|
|
|
+ // infra.GlobalLogger.Infof("将任务 %v 添加到【集群等待队列】成功。", task.Info.TaskId)
|
|
|
|
+ // } else { // 不能运行
|
|
|
|
+ // err := domain.AddWaitingUser(userId, userParallelism, algorithmObjectKey, task)
|
|
|
|
+ // if err != nil {
|
|
|
|
+ // infra.GlobalLogger.Errorf("将任务 %v 添加到【用户等待队列】失败,错误信息为:%v", task, err)
|
|
|
|
+ // continue
|
|
|
|
+ // }
|
|
|
|
+ // infra.GlobalLogger.Infof("将任务 %v 添加到【用户等待队列】成功。", task.Info.TaskId)
|
|
|
|
+ // }
|
|
|
|
+ // global.RunTaskMutex.Unlock()
|
|
|
|
+ //}
|
|
|
|
|
|
// 4 返回
|
|
// 4 返回
|
|
c.JSON(http.StatusOK, entity.HttpResult{Status: true, Code: "2000", Message: "项目启动请求已被成功接收,等待调度处理。"})
|
|
c.JSON(http.StatusOK, entity.HttpResult{Status: true, Code: "2000", Message: "项目启动请求已被成功接收,等待调度处理。"})
|