LingxinMeng 1 жил өмнө
parent
commit
4f0186c04d

+ 1 - 0
amd64/dispatch_server/package/domain/comm_with_k8s.go

@@ -0,0 +1 @@
+package domain

+ 85 - 0
amd64/dispatch_server/package/domain/comm_with_redis.go

@@ -0,0 +1,85 @@
+package domain
+
+import (
+	"cicv-data-closedloop/amd64/dispatch_server/package/entity"
+	"cicv-data-closedloop/amd64/dispatch_server/package/global"
+	"cicv-data-closedloop/amd64/dispatch_server/package/infra"
+	"cicv-data-closedloop/amd64/dispatch_server/package/util"
+	"encoding/json"
+	"errors"
+)
+
+func CanRun(userId string, userParallelism int64) bool {
+	TaskQueueRunningUserNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueRunningUserPrefix + ":" + userId).Result()
+	if TaskQueueRunningUserNumber < userParallelism {
+		return true
+	}
+	return false
+}
+
+func AddWaitingUser(userId string, userParallelism int64, task entity.Task) error {
+	taskCacheTemp := entity.TaskCache{
+		UserId:          userId,
+		UserParallelism: userParallelism,
+		Task:            task,
+	}
+	// 转 json
+	taskCacheJson, err := TaskCacheToJson(taskCacheTemp)
+	if err != nil {
+		return errors.New("任务缓存对象" + util.ToString(taskCacheTemp) + " 转 json 失败,错误信息为: " + util.ToString(err))
+	}
+	// 添加到用户等待队列
+	key := global.KeyTaskQueueWaitingUser
+	err = infra.GlobalRedisClient.RPush(key, taskCacheJson).Err()
+	if err != nil {
+		return errors.New("任务缓存对象json " + taskCacheJson + "  " + key + " 失败,错误信息为: " + util.ToString(err))
+	}
+	return nil
+}
+
+func DeleteWaitingUser(taskCacheJson string) error {
+	// 定义要删除的列表和值
+	key := global.KeyTaskQueueWaitingUser
+	value := taskCacheJson
+	// 使用 LREM 命令删除列表中的特定值
+	_, err := infra.GlobalRedisClient.LRem(key, 0, value).Result()
+	if err != nil {
+		return errors.New("删除用户等待队列中的值 " + taskCacheJson + " 失败,错误信息为: " + util.ToString(err))
+	}
+	return nil
+}
+
+func AddWaitingCluster(userId string, userParallelism int64, task entity.Task) error {
+	taskCacheTemp := entity.TaskCache{
+		UserId:          userId,
+		UserParallelism: userParallelism,
+		Task:            task,
+	}
+	// 转 json
+	taskCacheJson, err := TaskCacheToJson(taskCacheTemp)
+	if err != nil {
+		return errors.New("任务缓存对象" + util.ToString(taskCacheTemp) + " 转 json 失败,错误信息为: " + util.ToString(err))
+	}
+	// 添加到集群等待队列
+	err = infra.GlobalRedisClient.RPush(global.KeyTaskQueueWaitingCluster, taskCacheJson).Err()
+	if err != nil {
+		return errors.New("任务缓存对象json " + taskCacheJson + " 添加到集群等待队列失败,错误信息为: " + util.ToString(err))
+	}
+	return nil
+}
+
+func TaskToJson(task entity.Task) (string, error) {
+	jsonData, err := json.MarshalIndent(task, "", "    ")
+	if err != nil {
+		return "", err
+	}
+	return string(jsonData), nil
+}
+
+func TaskCacheToJson(taskCache entity.TaskCache) (string, error) {
+	jsonData, err := json.MarshalIndent(taskCache, "", "    ")
+	if err != nil {
+		return "", err
+	}
+	return string(jsonData), nil
+}

+ 1 - 1
amd64/dispatch_server/package/entity/project.go

@@ -4,6 +4,6 @@ type Project struct {
 	ProjectId          string `json:"projectId"`
 	AlgorithmObjectKey string `json:"algorithmObjectKey"`
 	UserId             string `json:"userId"`
-	Parallelism        int    `json:"parallelism"`
+	Parallelism        int64  `json:"parallelism"`
 	Tasks              []Task `json:"tasks"`
 }

+ 7 - 0
amd64/dispatch_server/package/entity/task_cache.go

@@ -0,0 +1,7 @@
+package entity
+
+type TaskCache struct {
+	UserId          string `json:"userId"`
+	UserParallelism int64  `json:"userParallelism"`
+	Task            Task   `json:"task"`
+}

+ 8 - 0
amd64/dispatch_server/package/global/redis_key.go

@@ -0,0 +1,8 @@
+package global
+
+var (
+	KeyTaskQueueRunningUserPrefix = "task-queue-running-user"    // 用户运行队列
+	KeyTaskQueueWaitingUser       = "task-queue-waiting-user"    // 用户等待队列,等待队列不需要根据userId区分开,资源先到先得
+	KeyTaskQueueRunningCluster    = "task-queue-running-cluster" // 集群运行队列
+	KeyTaskQueueWaitingCluster    = "task-queue-waiting-cluster" // 集群等待队列
+)

+ 5 - 0
amd64/dispatch_server/package/global/run_task_sync.go

@@ -0,0 +1,5 @@
+package global
+
+import "sync"
+
+var RunTaskMutex sync.Mutex

+ 1 - 0
amd64/dispatch_server/package/handler/receive_task_state.go

@@ -0,0 +1 @@
+package handler

+ 132 - 121
amd64/dispatch_server/package/handler/start_project.go

@@ -1,121 +1,122 @@
 package handler
 
 import (
+	"cicv-data-closedloop/amd64/dispatch_server/package/domain"
 	entity2 "cicv-data-closedloop/amd64/dispatch_server/package/entity"
+	"cicv-data-closedloop/amd64/dispatch_server/package/global"
+	"cicv-data-closedloop/amd64/dispatch_server/package/infra"
 	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/entity"
 	"github.com/gin-gonic/gin"
 	"net/http"
 )
 
-/**
- * {
- *     "info": {
- *         "project_id": 766,
- *         "task_id": 2789,
- *         "task_path": "jobs/RT_EY%402022-01-25_13%3A57%3A30_1643090250/SharingVan_20220125_0",
- *         "default_time": 30
- *     },
- *     "scenario": {
- *         "scenario_osc": "scenarios/SharingVAN_20220125/SharingVAN.xml",
- *         "scenario_odr": "scenarios/SharingVAN_20220125/Road_DFMC.xodr",
- *         "scenario_osgb": "scenarios/SharingVAN_20220125/Road_DFMC.opt.osgb"
- *     },
- *     "vehicle":{
- * 		"model": {
- * 			"model_label": "AudiA6_10"
- *                },
- * 		"dynamics": {
- * 			"dynamics_maxspeed": 67,
- * 			"dynamics_enginepower": 150000,
- * 			"dynamics_maxdecel": 9.5,
- * 			"dynamics_maxsteering": 0.48,
- * 			"dynamics_mass": 1700,
- * 			"dynamics_frontsurfaceeffective": 2.2,
- * 			"dynamics_airdragcoefficient": 0.31,
- * 			"dynamics_rollingresistance": 0,
- * 			"dynamics_wheeldiameter": 0.684,
- * 			"dynamics_wheeldrive": "wheel_drive_front",
- * 			"dynamics_overallefficiency": 0.75,
- * 			"dynamics_distfront": 3.838,
- * 			"dynamics_distrear": 1.086,
- * 			"dynamics_distleft": 0.94,
- * 			"dynamics_distright": 0.94,
- * 			"dynamics_distheight": 1.444,
- * 			"dynamics_wheelbase": 2.91
- *        },
- * 		"sensors": {
- * 			"camera": [
- *                {
- * 					"sensor_name": "following_view_camera",
- * 					"sensor_fovH": 45,
- * 					"sensor_fovV": 27,
- * 					"sensor_near": 1,
- * 					"sensor_far": 1500,
- * 					"sensor_resolution": "800x600",
- * 					"sensor_frameRate": 60,
- * 					"sensor_x": -15,
- * 					"sensor_y": 0,
- * 					"sensor_z": 5,
- * 					"sensor_h": 0,
- * 					"sensor_p": 10,
- * 					"sensor_r": 0
- *                },
- *                {
- * 					"sensor_name": "plan_view_camera",
- * 					"sensor_fovH": 45,
- * 					"sensor_fovV": 27,
- * 					"sensor_near": 1,
- * 					"sensor_far": 1500,
- * 					"sensor_resolution": "800x600",
- * 					"sensor_frameRate": 60,
- * 					"sensor_x": 35,
- * 					"sensor_y": 0,
- * 					"sensor_z": 200,
- * 					"sensor_h": 0,
- * 					"sensor_p": 90,
- * 					"sensor_r": 0
- *                }
- * 			],
- * 			"OGT": [
- *                {
- * 					"sensor_name": "360_perfect_sensor",
- * 					"sensor_fovH": 360,
- * 					"sensor_fovV": 20,
- * 					"sensor_near": 0,
- * 					"sensor_far": 100,
- * 					"sensor_x": 0,
- * 					"sensor_y": 0,
- * 					"sensor_z": 1.7,
- * 					"sensor_h": 0,
- * 					"sensor_p": 0,
- * 					"sensor_r": 0,
- * 					"sensor_filter": [0,1,5],
- * 					"sensor_display": true,
- * 					"sensor_maxObjects": 10,
- * 					"sensor_port": 10
- *                },
- *                {
- * 					"sensor_name": "120_perfect_sensor",
- * 					"sensor_fovH": 120,
- * 					"sensor_fovV": 20,
- * 					"sensor_near": 0,
- * 					"sensor_far": 100,
- * 					"sensor_x": 0,
- * 					"sensor_y": 0,
- * 					"sensor_z": 1.7,
- * 					"sensor_h": 0,
- * 					"sensor_p": 0,
- * 					"sensor_r": 0,
- * 					"sensor_filter": [0,1,5],
- * 					"sensor_display": true,
- * 					"sensor_maxObjects": 10,
- * 					"sensor_port": 10
- *                }
- * 			]
- *        }* 	}
- * }
- */
+/*
+{
+  "projectId": "项目ID",
+  "algorithmObjectKey": "算法在阿里云的存储路径",
+  "userId": "用户ID",
+  "parallelism": 10,
+  "tasks": [
+    {
+      "info": {
+        "project_id": "项目ID",
+        "task_id": "任务ID",
+        "task_path": "任务结果路径",
+        "default_time": "最大仿真时间"
+      },
+      "scenario": {
+        "scenario_osc": "xosc或xml路径",
+        "scenario_odr": "xodr路径",
+        "scenario_osgb": "osgb路径"
+      },
+      "vehicle": {
+        "model": {
+          "model_label": "模型标签"
+        },
+        "dynamics": {
+            "dynamics_maxspeed": 0.0, //最大速度(千米/小时)
+            "dynamics_enginepower": 0.0, // 发动机功率(千瓦)
+            "dynamics_maxdecel": 0.0, // 最大减速度(米/秒2)
+            "dynamics_maxsteering": 0.0, // 最大减速度(米/秒2)
+            "dynamics_mass": 0.0, // 质量(千克)
+            "dynamics_frontsurfaceeffective": 0.0, // 前表面有效面积(平方米)
+            "dynamics_airdragcoefficient": 0.0, // 空气阻力系数
+            "dynamics_rollingresistance": 0.0, // 滚动阻力系数
+            "dynamics_wheeldiameter": 0.0, // 车轮直径(米)
+            "dynamics_wheeldrive": "wheel_drive_front", // 驱动方式
+            "dynamics_overallefficiency": 0.0, // 总效率
+            "dynamics_distfront": 0.0, // 车前距(米)
+            "dynamics_distrear": 0.0, // 车后距(米)
+            "dynamics_distleft": 0.0, // 车左距(米)
+            "dynamics_distright": 0.0, // 车右距(米)
+            "dynamics_distheight": 0.0, // 车高(米)
+            "dynamics_wheelbase": 0.0 // 轴距(米)
+        },
+        "sensors": {
+          "camera": [
+            {
+              "sensor_name": "",
+              "sensor_near": 0,
+              "sensor_far": 0,
+              "sensor_x": 0,
+              "sensor_y": 0,
+              "sensor_z": 0,
+              "sensor_h": 0,
+              "sensor_p": 0,
+              "sensor_r": 0,
+              "sensor_fovH": 0,	// 水平视场角
+              "sensor_fovV": 0,	// 垂直视场角
+              "sensor_resolution": 0, // 水平视场角偏移量
+              "sensor_frameRate": 0	// 垂直视场角偏移量
+            }
+          ],
+          "OGT": [
+            {
+                "sensor_name": "", // 传感器名称
+                "sensor_near": 0, // 盲区距离
+                "sensor_far": 0, // 探测距离
+                "sensor_x": 0, // 传感器横向偏移量(x轴)
+                "sensor_y": 0, // 传感器纵向偏移量(y轴)
+                "sensor_z": 0, // 传感器安装高度(z轴)
+                "sensor_h": 0, // 传感器横摆角
+                "sensor_p": 0, // 传感器俯仰角
+                "sensor_r": 0, // 传感器横滚角
+                "sensor_fovHLeft": 0, // 水平现场角左
+                "sensor_fovHRight": 0, // 水平现场角右
+                "sensor_fovVTop": 0, // 垂直现场角顶
+                "sensor_fovVBottom": 0, // 垂直现场角底
+                "sensor_filter": "0,5,6", // 目标物筛选序列(0,1,2)
+                "sensor_display": false, // 显示目标物
+                "sensor_maxObjects": 0, // 最大目标物个数
+                "sensor_port": 0 // 端口
+            },
+            {
+              "sensor_name": "",
+              "sensor_near": 0,
+              "sensor_far": 0,
+              "sensor_x": 0,
+              "sensor_y": 0,
+              "sensor_z": 0,
+              "sensor_h": 0,
+              "sensor_p": 0,
+              "sensor_r": 0,
+              "sensor_fovHLeft": 0,
+              "sensor_fovHRight": 0,
+              "sensor_fovVTop": 0,
+              "sensor_fovVBottom": 0,
+              "sensor_filter": "1,2,3,4",
+              "sensor_display": false,
+              "sensor_maxObjects": 0,
+              "sensor_port": 0
+            }
+          ]
+        }
+      }
+    }
+  ]
+}
+*/
 
 func StartProject(c *gin.Context) {
 	projectStartParam := new(entity2.Project)
@@ -124,21 +125,31 @@ func StartProject(c *gin.Context) {
 		c.JSON(http.StatusBadRequest, entity.HttpResult{Status: false, Code: "1003", Message: "请求参数格式错误。"})
 		return
 	}
-	// 1 维护一个运行任务队列,绑定用户id和节点名称,供下面两个判断同时使用(使用redis的队列)
-	// 2 获取正在运行的任务数量,在缓存中
-	// 3 判断用户并行度
-	{
-		// 1 获取接收到的任务数量
-		//taskCount := len(projectStartParam.Tasks)
-
-		// 2 判断是否可以运行,可以运行的加入中间队列进行集群并行度判断,不能运行的加入用户等待队列
+	// ------------ 维护一个运行任务队列,绑定用户id和节点名称,供下面两个判断同时使用(使用redis的队列)
+	userId := projectStartParam.UserId               // 用户ID
+	taskReceived := projectStartParam.Tasks          // 接收到的所有任务
+	userParallelism := projectStartParam.Parallelism // 用户的并行度上限
 
+	// 1 判断用户并行度
+	for _, task := range taskReceived {
+		global.RunTaskMutex.Lock()
+		// 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
+		if domain.CanRun(userId, userParallelism) { // 可以运行
+			err := domain.AddWaitingCluster(userId, userParallelism, task)
+			if err != nil {
+				infra.GlobalLogger.Errorf("将任务 %v 添加到集群等待队列失败,错误信息为:%v", task, err)
+				continue
+			}
+		} else { // 不能运行
+			err := domain.AddWaitingUser(userId, userParallelism, task)
+			if err != nil {
+				infra.GlobalLogger.Errorf("将任务 %v 添加到集群等待队列失败,错误信息为:%v", task, err)
+				continue
+			}
+		}
+		global.RunTaskMutex.Unlock()
 	}
-	// 3 判断集群并行度
-	{
-		// 1 获取集群并行度
-		// 2 判断是否可以运行,可以运行的执行运行命令并加入运行队列,不能运行的加入集群等待队列
-	}
+
 	// 4 返回
 	c.JSON(http.StatusOK, entity.HttpResult{Status: true, Code: "2000", Message: "项目启动请求已被成功接收,等待调度处理。"})
 }

+ 2 - 2
amd64/dispatch_server/package/infra/i_redis.go

@@ -20,8 +20,8 @@ func InitRedisClient(addr string, password string, db int) {
 			DB:       db,       // Redis数据库索引
 		})
 	})
-	// 添加键值对到 Redis
-	err := GlobalRedisClient.Set("init", "success", 0).Err()
+	// 检查连接是否成功
+	_, err := GlobalRedisClient.Ping().Result()
 	if err != nil {
 		GlobalLogger.Error("初始化 Redis 客户端报错:", err)
 	}

+ 79 - 0
amd64/dispatch_server/package/service/run_task.go

@@ -0,0 +1,79 @@
+package service
+
+import (
+	"cicv-data-closedloop/amd64/dispatch_server/package/domain"
+	"cicv-data-closedloop/amd64/dispatch_server/package/entity"
+	"cicv-data-closedloop/amd64/dispatch_server/package/global"
+	"cicv-data-closedloop/amd64/dispatch_server/package/infra"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"time"
+)
+
+/*
+负责处理用户等待队列中的任务
+负责运行集群等待队列中的任务
+*/
+
+// 判断用户等待队列中的任务是否可以加入到集群等待队列
+func runWaitingUser() {
+
+	for {
+		time.Sleep(2 * time.Second)
+		global.RunTaskMutex.Lock()
+		// 获取Redis列表中的值
+		taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
+		if err != nil {
+			infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
+			continue
+		}
+
+		for _, taskCacheJson := range taskCacheJsons {
+			taskCache, err := JsonToTaskCache(taskCacheJson)
+			if err != nil {
+				infra.GlobalLogger.Error(err)
+				continue
+			}
+			userId := taskCache.UserId
+			userParallelism := taskCache.UserParallelism
+			task := taskCache.Task
+			// 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
+			if domain.CanRun(userId, userParallelism) { // 可以运行
+				err = domain.AddWaitingCluster(userId, userParallelism, task)
+				if err != nil {
+					infra.GlobalLogger.Error(err)
+					continue
+				}
+				err = domain.DeleteWaitingUser(task.Info.TaskId)
+				if err != nil {
+					infra.GlobalLogger.Error(err)
+					continue
+				}
+			}
+		}
+		global.RunTaskMutex.Unlock()
+	}
+}
+
+// 集群等待队列中的任务判断是否可以加入集群运行队列
+func runWaitingCluster() {
+	// 2 判断集群并行度
+	//{
+	//	// 1 判断是否可以运行,可以运行的加入集群运行队列,不能运行的加入集群等待队列
+	//	TaskQueueRunningClusterNumber, _ := infra.GlobalRedisClient.LLen(KeyTaskQueueRunningCluster).Result()
+	//	// 2 判断是否可以运行,可以运行的执行运行命令并加入运行队列,不能运行的加入集群等待队列
+	//}
+}
+
+func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
+	// 创建一个 Person 类型的变量
+	var taskCache entity.TaskCache
+
+	// 使用 json.Unmarshal 解析 JSON 字符串到结构体
+	err := json.Unmarshal([]byte(jsonData), &taskCache)
+	if err != nil {
+		return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
+	}
+	return taskCache, nil
+}

+ 17 - 0
amd64/dispatch_server/package/util/u_string.go

@@ -0,0 +1,17 @@
+package util
+
+import (
+	"fmt"
+	"github.com/google/uuid"
+)
+
+func ToString(value interface{}) string {
+	return fmt.Sprintf("%v", value)
+}
+
+func NewUUID() string {
+	return uuid.New().String()[:8]
+}
+func NewShortUUID() string {
+	return uuid.New().String()[:8]
+}