LingxinMeng 1 年間 前
コミット
9c8e304ff1

+ 41 - 62
amd64/dispatch_server/main.go

@@ -5,93 +5,72 @@ package main
 */
 
 import (
+	"cicv-data-closedloop/amd64/dispatch_server/package/global"
 	"cicv-data-closedloop/amd64/dispatch_server/package/handler"
 	"cicv-data-closedloop/amd64/dispatch_server/package/infra"
+	"cicv-data-closedloop/amd64/dispatch_server/package/service"
 	commonHandler "cicv-data-closedloop/common/gin/handler"
 	"cicv-data-closedloop/common/util"
 	_ "embed"
-	"fmt"
+	"encoding/json"
 	"github.com/gin-gonic/gin"
-	"gopkg.in/yaml.v3"
 	"os"
 )
 
-type ApplicationYamlStruct struct {
-	ApplicationName string      `yaml:"application-name"`
-	Web             WebStruct   `yaml:"web"`
-	Log             LogStruct   `yaml:"log"`
-	Redis           RedisStruct `yaml:"redis"`
-	Oss             OssStruct   `yaml:"oss"`
-	GpuNodeList     []GpuNode   `yaml:"gpu_node_list"`
-}
-
-type WebStruct struct {
-	Port        string   `yaml:"port"`
-	RoutePrefix string   `yaml:"route-prefix"`
-	Token       string   `yaml:"token"`
-	WhiteList   []string `yaml:"white-list"`
-}
-
-type LogStruct struct {
-	Dir    string `yaml:"dir"`
-	Prefix string `yaml:"prefix"`
-}
-
-type RedisStruct struct {
-	Addr     string `yaml:"addr"`
-	Password string `yaml:"password"`
-	Db       int    `yaml:"db"`
-}
-type OssStruct struct {
-	IsUseCname      bool   `yaml:"is-use-cname"`
-	Endpoint        string `yaml:"endpoint"`
-	AccessKeyId     string `yaml:"access-key-id"`
-	AccessKeySecret string `yaml:"access-key-secret"`
-	BucketName      string `yaml:"bucket-name"`
-}
-
-type GpuNode struct {
-	Hostname    string `yaml:"hostname"`
-	Ip          string `yaml:"ip"`
-	Parallelism string `yaml:"parallelism"`
-}
-
-var (
-	//go:embed application.yaml
-	applicationYamlBytes []byte
-	ApplicationYaml      ApplicationYamlStruct
-)
-
 func init() {
 	// 1 解析YAML内容
-	_ = yaml.Unmarshal(applicationYamlBytes, &ApplicationYaml)
-	fmt.Println("加载配置文件内容为:", ApplicationYaml)
+	infra.InitApplication()
 	// 2 初始化 日志
-	infra.InitLog(ApplicationYaml.Log.Dir, ApplicationYaml.Log.Prefix)
+	infra.InitLog(infra.ApplicationYaml.Log.Dir, infra.ApplicationYaml.Log.Prefix)
 	// 3 初始化 阿里云oss 客户端
 	infra.InitOss(
-		ApplicationYaml.Oss.IsUseCname,
-		ApplicationYaml.Oss.Endpoint,
-		ApplicationYaml.Oss.AccessKeyId,
-		ApplicationYaml.Oss.AccessKeySecret,
-		ApplicationYaml.Oss.BucketName,
+		infra.ApplicationYaml.Oss.IsUseCname,
+		infra.ApplicationYaml.Oss.Endpoint,
+		infra.ApplicationYaml.Oss.AccessKeyId,
+		infra.ApplicationYaml.Oss.AccessKeySecret,
+		infra.ApplicationYaml.Oss.BucketName,
 	)
 	// 4 初始化 Redis 客户端
 	infra.InitRedisClient(
-		ApplicationYaml.Redis.Addr,
-		ApplicationYaml.Redis.Password,
-		ApplicationYaml.Redis.Db,
+		infra.ApplicationYaml.Redis.Addr,
+		infra.ApplicationYaml.Redis.Password,
+		infra.ApplicationYaml.Redis.Db,
 	)
+	// 5 将 gpu-node-list 写入redis
+	err := infra.GlobalRedisClient.Del(global.KeyGpuNodeList).Err()
+	if err != nil {
+		infra.GlobalLogger.Error("程序崩溃。gpu-node-list 初始化失败1:", err)
+		os.Exit(-1)
+	}
+	for _, gpuNode := range infra.ApplicationYaml.GpuNodeList {
+		gpuNodeJson, err := json.MarshalIndent(gpuNode, "", "    ")
+		if err != nil {
+			infra.GlobalLogger.Error("程序崩溃。gpu-node-list 初始化失败2:", err)
+			os.Exit(-1)
+		}
+		_, err = infra.GlobalRedisClient.RPush("gpu-node-list", gpuNodeJson).Result()
+		if err != nil {
+			infra.GlobalLogger.Error("程序崩溃。gpu-node-list 初始化失败3:", err)
+			os.Exit(-1)
+		}
+	}
+
 }
 
 func main() {
+
+	// 启动任务处理进程
+	go service.RunWaitingUser()
+	go service.RunWaitingCluster()
+
+	// 启动 web 服务器
 	router := gin.Default()
 	router.Use(commonHandler.ValidateHeaders())
-	api := router.Group(ApplicationYaml.Web.RoutePrefix)
+	api := router.Group(infra.ApplicationYaml.Web.RoutePrefix)
 	api.POST("/start-project", handler.StartProject)
-	err := router.Run(":" + ApplicationYaml.Web.Port)
+	err := router.Run(":" + infra.ApplicationYaml.Web.Port)
 	if err != nil {
-		infra.GlobalLogger.Error("程序崩溃,监听端口 " + util.ToString(ApplicationYaml.Web.Port) + " 失败。")
+		infra.GlobalLogger.Error("程序崩溃,监听端口 " + util.ToString(infra.ApplicationYaml.Web.Port) + " 失败。")
 		os.Exit(-1)
 	}
 }

+ 55 - 1
amd64/dispatch_server/package/domain/comm_with_redis.go

@@ -7,9 +7,10 @@ import (
 	"cicv-data-closedloop/amd64/dispatch_server/package/util"
 	"encoding/json"
 	"errors"
+	"fmt"
 )
 
-func CanRun(userId string, userParallelism int64) bool {
+func CanRunUser(userId string, userParallelism int64) bool {
 	TaskQueueRunningUserNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueRunningUserPrefix + ":" + userId).Result()
 	if TaskQueueRunningUserNumber < userParallelism {
 		return true
@@ -17,6 +18,32 @@ func CanRun(userId string, userParallelism int64) bool {
 	return false
 }
 
+// 集群是否有剩余并行度可以运行任务
+//   - return 可运行任务的节点,取并行度剩余最大的节点
+func CanRunCluster() (bool, infra.GpuNode, error) {
+	// 1 获取集群并行度
+	gpuNodeJsons, err := infra.GlobalRedisClient.LRange(global.KeyGpuNodeList, 0, -1).Result()
+	if err != nil {
+		return false, infra.GpuNode{}, errors.New("获取集群并行度列表 " + global.KeyGpuNodeList + " 失败,错误信息为:" + util.ToString(err))
+	}
+	var maxNode infra.GpuNode
+	var maxParallelism int64
+	var can bool
+	maxParallelism = 0
+	for _, gpuNodeJson := range gpuNodeJsons {
+		node, err := JsonToGpuNode(gpuNodeJson)
+		if err != nil {
+			return false, infra.GpuNode{}, err
+		}
+		if node.Parallelism != 0 && node.Parallelism > maxParallelism {
+			maxNode = node
+			maxParallelism = node.Parallelism
+			can = true
+		}
+	}
+	return can, maxNode, nil
+}
+
 func AddWaitingUser(userId string, userParallelism int64, task entity.Task) error {
 	taskCacheTemp := entity.TaskCache{
 		UserId:          userId,
@@ -68,6 +95,21 @@ func AddWaitingCluster(userId string, userParallelism int64, task entity.Task) e
 	return nil
 }
 
+func AddRunningCluster(taskCache entity.TaskCache) error {
+	taskCacheTemp := taskCache
+	// 转 json
+	taskCacheJson, err := TaskCacheToJson(taskCacheTemp)
+	if err != nil {
+		return errors.New("任务缓存对象" + util.ToString(taskCacheTemp) + " 转 json 失败,错误信息为: " + util.ToString(err))
+	}
+	// 添加到集群运行队列
+	err = infra.GlobalRedisClient.RPush(global.KeyTaskQueueRunningCluster, taskCacheJson).Err()
+	if err != nil {
+		return errors.New("任务缓存对象json " + taskCacheJson + " 添加到集群等待队列失败,错误信息为: " + util.ToString(err))
+	}
+	return nil
+}
+
 func TaskToJson(task entity.Task) (string, error) {
 	jsonData, err := json.MarshalIndent(task, "", "    ")
 	if err != nil {
@@ -83,3 +125,15 @@ func TaskCacheToJson(taskCache entity.TaskCache) (string, error) {
 	}
 	return string(jsonData), nil
 }
+
+func JsonToGpuNode(jsonData string) (infra.GpuNode, error) {
+	// 创建一个 Person 类型的变量
+	var taskCache infra.GpuNode
+
+	// 使用 json.Unmarshal 解析 JSON 字符串到结构体
+	err := json.Unmarshal([]byte(jsonData), &taskCache)
+	if err != nil {
+		return infra.GpuNode{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
+	}
+	return taskCache, nil
+}

+ 0 - 18
amd64/dispatch_server/package/entity/kubernetes_scheduler_config.go

@@ -1,18 +0,0 @@
-package entity
-
-type KubernetesSchedulerConfigStruct struct {
-	Service     ServiceStruct `yaml:"service"`
-	GpuNodeList []GpuNode     `yaml:"gpu-node-list"`
-}
-
-type ServiceStruct struct {
-	Port         uint64 `yaml:"port"`
-	Name         string `yaml:"name"`
-	RouterPrefix string `yaml:"router-prefix"`
-}
-
-type GpuNode struct {
-	Hostname    string `yaml:"hostname"`
-	Ip          string `yaml:"ip"`
-	Parallelism string `yaml:"parallelism"`
-}

+ 5 - 0
amd64/dispatch_server/package/global/redis_key.go

@@ -1,6 +1,11 @@
 package global
 
+import "sync"
+
 var (
+	KeyGpuNodeList   = "gpu-node-list" // 用户运行队列
+	GpuNodeListMutex sync.Mutex
+
 	KeyTaskQueueRunningUserPrefix = "task-queue-running-user"    // 用户运行队列
 	KeyTaskQueueWaitingUser       = "task-queue-waiting-user"    // 用户等待队列,等待队列不需要根据userId区分开,资源先到先得
 	KeyTaskQueueRunningCluster    = "task-queue-running-cluster" // 集群运行队列

+ 0 - 0
amd64/dispatch_server/package/global/run_task_sync.go → amd64/dispatch_server/package/global/run_sync.go


+ 1 - 1
amd64/dispatch_server/package/handler/start_project.go

@@ -134,7 +134,7 @@ func StartProject(c *gin.Context) {
 	for _, task := range taskReceived {
 		global.RunTaskMutex.Lock()
 		// 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
-		if domain.CanRun(userId, userParallelism) { // 可以运行
+		if domain.CanRunUser(userId, userParallelism) { // 可以运行
 			err := domain.AddWaitingCluster(userId, userParallelism, task)
 			if err != nil {
 				infra.GlobalLogger.Errorf("将任务 %v 添加到集群等待队列失败,错误信息为:%v", task, err)

+ 6 - 1
amd64/dispatch_server/application.yaml → amd64/dispatch_server/package/infra/application.yaml

@@ -1,5 +1,6 @@
 application-name: dispatch_server
 web:
+  ip-private: 10.14.85.241
   port: 12342
   route-prefix: /dispatch_server
   token: U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM
@@ -29,4 +30,8 @@ gpu-node-list:
     parallelism: 6
   - hostname: gpu004
     ip: 10.14.86.73
-    parallelism: 6
+    parallelism: 6
+
+k8s:
+  pod-yaml-dir: /mnt/disk001/cicv-data-closedloop/pod-yaml/
+  vtd-pod-template-yaml: /mnt/disk001/cicv-data-closedloop/pod-template/vtd-pod-template.yaml

+ 65 - 0
amd64/dispatch_server/package/infra/i_application.go

@@ -0,0 +1,65 @@
+package infra
+
+import (
+	_ "embed"
+	"fmt"
+	"gopkg.in/yaml.v2"
+)
+
+type ApplicationYamlStruct struct {
+	ApplicationName string      `yaml:"application-name"`
+	Web             WebStruct   `yaml:"web"`
+	Log             LogStruct   `yaml:"log"`
+	Redis           RedisStruct `yaml:"redis"`
+	Oss             OssStruct   `yaml:"oss"`
+	GpuNodeList     []GpuNode   `yaml:"gpu-node-list"`
+	K8s             K8sStruct   `yaml:"k8s"`
+}
+
+type WebStruct struct {
+	IpPrivate   string   `yaml:"ip-private"`
+	Port        string   `yaml:"port"`
+	RoutePrefix string   `yaml:"route-prefix"`
+	Token       string   `yaml:"token"`
+	WhiteList   []string `yaml:"white-list"`
+}
+
+type LogStruct struct {
+	Dir    string `yaml:"dir"`
+	Prefix string `yaml:"prefix"`
+}
+
+type RedisStruct struct {
+	Addr     string `yaml:"addr"`
+	Password string `yaml:"password"`
+	Db       int    `yaml:"db"`
+}
+type OssStruct struct {
+	IsUseCname      bool   `yaml:"is-use-cname"`
+	Endpoint        string `yaml:"endpoint"`
+	AccessKeyId     string `yaml:"access-key-id"`
+	AccessKeySecret string `yaml:"access-key-secret"`
+	BucketName      string `yaml:"bucket-name"`
+}
+
+type GpuNode struct {
+	Hostname    string `yaml:"hostname"`
+	Ip          string `yaml:"ip"`
+	Parallelism int64  `yaml:"parallelism"`
+}
+
+type K8sStruct struct {
+	PodYamlDir         string `yaml:"pod-yaml-dir"`
+	VtdPodTemplateYaml string `yaml:"vtd-pod-template-yaml"`
+}
+
+var (
+	//go:embed application.yaml
+	applicationYamlBytes []byte
+	ApplicationYaml      ApplicationYamlStruct
+)
+
+func InitApplication() {
+	_ = yaml.Unmarshal(applicationYamlBytes, &ApplicationYaml)
+	fmt.Println("加载配置文件内容为:", ApplicationYaml)
+}

+ 58 - 9
amd64/dispatch_server/package/service/run_task.go

@@ -5,9 +5,11 @@ import (
 	"cicv-data-closedloop/amd64/dispatch_server/package/entity"
 	"cicv-data-closedloop/amd64/dispatch_server/package/global"
 	"cicv-data-closedloop/amd64/dispatch_server/package/infra"
+	"cicv-data-closedloop/amd64/dispatch_server/package/util"
 	"encoding/json"
 	"errors"
 	"fmt"
+	"strings"
 	"time"
 )
 
@@ -17,7 +19,7 @@ import (
 */
 
 // 判断用户等待队列中的任务是否可以加入到集群等待队列
-func runWaitingUser() {
+func RunWaitingUser() {
 
 	for {
 		time.Sleep(2 * time.Second)
@@ -39,7 +41,7 @@ func runWaitingUser() {
 			userParallelism := taskCache.UserParallelism
 			task := taskCache.Task
 			// 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
-			if domain.CanRun(userId, userParallelism) { // 可以运行
+			if domain.CanRunUser(userId, userParallelism) { // 可以运行
 				err = domain.AddWaitingCluster(userId, userParallelism, task)
 				if err != nil {
 					infra.GlobalLogger.Error(err)
@@ -57,13 +59,60 @@ func runWaitingUser() {
 }
 
 // 集群等待队列中的任务判断是否可以加入集群运行队列
-func runWaitingCluster() {
-	// 2 判断集群并行度
-	//{
-	//	// 1 判断是否可以运行,可以运行的加入集群运行队列,不能运行的加入集群等待队列
-	//	TaskQueueRunningClusterNumber, _ := infra.GlobalRedisClient.LLen(KeyTaskQueueRunningCluster).Result()
-	//	// 2 判断是否可以运行,可以运行的执行运行命令并加入运行队列,不能运行的加入集群等待队列
-	//}
+func RunWaitingCluster() {
+	for {
+		time.Sleep(2 * time.Second)
+		global.GpuNodeListMutex.Lock()
+		// 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
+		can, gpuNode, err := domain.CanRunCluster()
+		if err != nil {
+			infra.GlobalLogger.Error(err)
+			continue
+		}
+		var firstTaskCache entity.TaskCache
+		if can {
+			// 移除并取出
+			firstTaskCacheJson, err := infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
+			if err != nil {
+				infra.GlobalLogger.Error("移除并取出集群等待队列中的头元素报错,错误信息为:", err)
+				continue
+			}
+
+			firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
+			if err != nil {
+				infra.GlobalLogger.Error(err)
+				continue
+			}
+			err = domain.AddRunningCluster(firstTaskCache)
+			if err != nil {
+				infra.GlobalLogger.Error(err)
+				continue
+			}
+		}
+		global.GpuNodeListMutex.Unlock()
+
+		// --------------- 启动 k8s pod ---------------
+		projectId := firstTaskCache.Task.Info.ProjectId
+		nodeName := gpuNode.Hostname
+		// 1 生成 podName
+		podName := "project-" + projectId + "-" + util.NewShortUUID()
+		// 2 生成模板文件名称
+		podYaml := nodeName + "#" + podName + ".yaml"
+		// 3 模板yaml存储路径
+		yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml
+		// 4 模板yaml备份路径
+		yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml
+		// 5
+		podString, err := util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYaml)
+		if err != nil {
+			infra.GlobalLogger.Error(err)
+			continue
+		}
+		podString = strings.Replace(podString, "vtd-container-name", "vtd-"+projectId+"-"+nodeName, 1)
+		podString = strings.Replace(podString, "cicv-data-closedloop-ip", "vtd-"+projectId+"-"+nodeName, 1)
+		// todo golang 连接 kafka
+		podString = strings.Replace(podString, "kafka-ip", "vtd-"+projectId+"-"+nodeName, 1)
+	}
 }
 
 func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {

+ 22 - 0
amd64/dispatch_server/package/util/u_file.go

@@ -0,0 +1,22 @@
+package util
+
+import (
+	"io"
+	"os"
+)
+
+func ReadFile(filePath string) (string, error) {
+	// 1 打开文件
+	file, err := os.Open(filePath)
+	if err != nil {
+		return "", err
+	}
+	defer file.Close()
+
+	// 2 读取文件内容
+	content, err := io.ReadAll(file)
+	if err != nil {
+		return "", err
+	}
+	return string(content), err
+}

+ 70 - 0
amd64/dispatch_server/vtd-pod-template.yaml

@@ -0,0 +1,70 @@
+apiVersion: v1
+kind: Pod
+metadata:
+  name: pod-name
+  namespace: namespace-name
+  labels:
+    user: CICV
+spec:
+  nodeName: node-name
+  dnsPolicy: None
+  dnsConfig:
+    nameservers:
+      #- 223.6.6.6
+      #- 8.8.8.8
+      - 10.16.11.1
+      - 10.16.11.2
+  hostAliases:
+    - ip: 10.14.85.239
+      hostnames:
+        - simulation004
+    - ip: 10.14.85.237
+      hostnames:
+        - gpu001
+  initContainers:
+    - name: init
+      image: algorithm-image
+      imagePullPolicy: Always
+      command: [ 'sh', '-c', 'echo algorithm image downloaded && sleep 2' ]
+  containers:
+    - name: vtd-container-name
+      image: vtd-image
+      imagePullPolicy: Always
+      command: [ "/Controller/VTDController", "vtd-command", "kafka-topic" ]
+      #command: [ "/Controller/VTDController", "/Controller/config/docker_cloud_algContest.ini", "kafka-topic" ]
+      resources:
+        limits:
+          nvidia.com/gpu: 1
+          #nvidia.com/mig-1g.10gb: 1
+      env:
+        - name: PodName
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.name
+        - name: LM_LICENSE_FILE
+          value: 27500@10.14.85.247
+          #value: 27500@10.14.8.24
+          #value: 27500@172.20.0.2
+        - name: CICV_DATA_CLOSEDLOOP_IP
+          value: cicv-data-closedloop-ip
+        - name: KAFKA_IP
+          value: kafka-ip
+        - name: MINIO_IP
+          value: minio-ip
+        - name: MINIO_ACCESS_KEY
+          value: minio-access-key
+        - name: MINIO_SECRET_KEY
+          value: minio-secret-key
+        - name: KAFKA_PARTITION
+          value: kafka-partition
+        - name: KAFKA_OFFSET
+          value: kafka-offset
+        - name: CPU_ORDER
+          value: cpu-order
+        - name: MINIO_BUCKET_NAME
+          value: minio-bucket
+    - name: algorithm-container
+      image: algorithm-image
+      imagePullPolicy: Never
+      command: [ "/bin/sh", "-c", "/run.sh; touch /tmp/hello.txt;while true;do /bin/echo $(date +%T) >> /tmp/hello.txt; sleep 600; done;" ]
+  restartPolicy: Never