|
@@ -2,12 +2,11 @@ package service
|
|
|
|
|
|
import (
|
|
|
"dcl_dispatch_server/src/package/domain"
|
|
|
- "dcl_dispatch_server/src/package/entity"
|
|
|
+ "dcl_dispatch_server/src/package/domain/project"
|
|
|
+ "dcl_dispatch_server/src/package/domain/task_cache"
|
|
|
"dcl_dispatch_server/src/package/global"
|
|
|
"dcl_dispatch_server/src/package/infra"
|
|
|
"dcl_dispatch_server/src/package/util"
|
|
|
- "encoding/json"
|
|
|
- "errors"
|
|
|
"fmt"
|
|
|
"path/filepath"
|
|
|
"strconv"
|
|
@@ -36,7 +35,7 @@ func RunWaitingUser() {
|
|
|
}
|
|
|
|
|
|
for _, taskCacheJson := range taskCacheJsons {
|
|
|
- taskCache, err := JsonToTaskCache(taskCacheJson)
|
|
|
+ taskCache, err := task_cache.JsonToTaskCache(taskCacheJson)
|
|
|
if err != nil {
|
|
|
infra.GlobalLogger.Error(err)
|
|
|
continue
|
|
@@ -48,7 +47,7 @@ func RunWaitingUser() {
|
|
|
task := taskCache.Task
|
|
|
// 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
|
|
|
if domain.CanRunUser(userId, userParallelism) { // 可以运行
|
|
|
- err = domain.AddWaitingCluster(&entity.Project{
|
|
|
+ err = domain.AddWaitingCluster(&project.Project{
|
|
|
UserId: userId,
|
|
|
Parallelism: userParallelism,
|
|
|
AlgorithmObjectKey: algorithmObjectKey,
|
|
@@ -83,7 +82,7 @@ func RunWaitingCluster() {
|
|
|
global.GpuNodeListMutex.Unlock()
|
|
|
continue
|
|
|
}
|
|
|
- firstTaskCache := entity.TaskCache{}
|
|
|
+ firstTaskCache := task_cache.TaskCache{}
|
|
|
algorithmTarName := ""
|
|
|
algorithmTarPath := ""
|
|
|
algorithmImageName := ""
|
|
@@ -108,7 +107,7 @@ func RunWaitingCluster() {
|
|
|
global.GpuNodeListMutex.Unlock()
|
|
|
continue
|
|
|
}
|
|
|
- firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
|
|
|
+ firstTaskCache, err = task_cache.JsonToTaskCache(firstTaskCacheJson)
|
|
|
if err != nil {
|
|
|
infra.GlobalLogger.Error(err)
|
|
|
global.GpuNodeListMutex.Unlock()
|
|
@@ -207,12 +206,11 @@ func RunWaitingCluster() {
|
|
|
// --------------- 发送 kafka 消息(获取偏移量和分区) ---------------
|
|
|
// 获取任务消息转json
|
|
|
firstTaskCache.Task.Vehicle.Sensors.Camera = global.DefaultCameras
|
|
|
- taskJson, err := TaskToJson(firstTaskCache.Task)
|
|
|
+ taskJson, err := project.TaskToJson(firstTaskCache.Task)
|
|
|
if err != nil {
|
|
|
infra.GlobalLogger.Error(err)
|
|
|
continue
|
|
|
}
|
|
|
-
|
|
|
// ------- 发送 -------
|
|
|
topic := projectId
|
|
|
// 创建一个Message,并指定分区为0
|
|
@@ -340,23 +338,3 @@ func RunWaitingCluster() {
|
|
|
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
|
|
|
- // 创建一个 Person 类型的变量
|
|
|
- var taskCache entity.TaskCache
|
|
|
-
|
|
|
- // 使用 json.Unmarshal 解析 JSON 字符串到结构体
|
|
|
- err := json.Unmarshal([]byte(jsonData), &taskCache)
|
|
|
- if err != nil {
|
|
|
- return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
|
|
|
- }
|
|
|
- return taskCache, nil
|
|
|
-}
|
|
|
-
|
|
|
-func TaskToJson(task entity.Task) (string, error) {
|
|
|
- jsonData, err := json.Marshal(task)
|
|
|
- if err != nil {
|
|
|
- return "", errors.New("转json失败,错误信息为:" + err.Error())
|
|
|
- }
|
|
|
- return string(jsonData), nil
|
|
|
-}
|