LingxinMeng 7 months ago
parent
commit
ed8bd682d0

+ 1 - 0
src/package/domain/comm_with_redis.go

@@ -81,6 +81,7 @@ func DeleteWaitingUser(taskCacheJson string) error {
 
 func AddWaitingCluster(project *entity.Project, task entity.Task) error {
 	taskCacheTemp := entity.TaskCache{
+		Env:                project.Env,
 		UserId:             project.UserId,
 		AlgorithmObjectKey: project.AlgorithmObjectKey,
 		UserParallelism:    project.Parallelism,

+ 1 - 0
src/package/entity/project.go

@@ -1,6 +1,7 @@
 package entity
 
 type Project struct {
+	Env                string `json:"env"` // 环境,需要根据该字段判断用哪个oss // cicv 国汽平台 pji 朴津平台
 	ProjectId          string `json:"projectId"`
 	AlgorithmObjectKey string `json:"algorithmObjectKey"`
 	EquipmentType      string `json:"equipmentType"`

+ 1 - 0
src/package/entity/task_cache.go

@@ -5,5 +5,6 @@ type TaskCache struct {
 	UserParallelism    int64  `json:"userParallelism"`
 	AlgorithmObjectKey string `json:"algorithmObjectKey"`
 	EquipmentType      string `json:"equipmentType"`
+	Env                string `json:"env"`
 	Task               Task   `json:"task"`
 }

+ 4 - 6
src/package/service/run_task.go

@@ -68,7 +68,7 @@ func RunWaitingUser() {
 	}
 }
 
-// RunWaitingCluster 集群等待队列中的任务判断是否可以加入集群运行队列
+// 集群等待队列中的任务判断是否可以加入集群运行队列
 func RunWaitingCluster() {
 	infra.GlobalLogger.Infof("启动【集群等待队列】监控进程。")
 	for {
@@ -90,11 +90,9 @@ func RunWaitingCluster() {
 		algorithmExist := false
 
 		if can {
-			//infra.GlobalLogger.Infof("节点 %v 有剩余并行度。", gpuNode)
 			// 判断是否有待运行的任务
 			waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result()
 			if waitingClusterNumber == 0 {
-				//infra.GlobalLogger.Info("集群没有等待运行的任务。")
 				global.GpuNodeListMutex.Unlock()
 				continue
 			} else {
@@ -121,7 +119,7 @@ func RunWaitingCluster() {
 					continue
 				}
 			}
-			// --------------- 下载算法 --------------- todo 算法这里需要控制已经下载过的算法就不要再次下载了
+			// --------------- 下载算法 ---------------
 			{
 				infra.GlobalLogger.Infof("开始下载算法 %v。", firstTaskCache.AlgorithmObjectKey)
 				algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey)
@@ -131,7 +129,7 @@ func RunWaitingCluster() {
 				algorithmImageNameWithVersion = algorithmImageName + ":latest"
 				algorithmExist = util.ImageExists(infra.GlobalDockerClient, algorithmImageName)
 				if !algorithmExist {
-					if firstTaskCache.EquipmentType == "JIN_LONG_BA_SHI" || firstTaskCache.EquipmentType == "PU_JIN_DUO_GONG_NENG_CHE" {
+					if firstTaskCache.Env == "cicv" {
 						err = infra.GlobalOssBucketCicv.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
 					} else {
 						err = infra.GlobalOssBucketPji.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
@@ -303,7 +301,7 @@ func RunWaitingCluster() {
 		tempDir := "/mnt/disk001/dcl_dispatch_server/temp/"
 		util.CreateDir(tempDir)
 		xoscLocalPath := tempDir + util.NewShortUUID() + ".xosc"
-		if firstTaskCache.EquipmentType == "JIN_LONG_BA_SHI" || firstTaskCache.EquipmentType == "PU_JIN_DUO_GONG_NENG_CHE" {
+		if firstTaskCache.Env == "cicv" { // cicv 或 pji
 			err = infra.GlobalOssBucketCicv.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath)
 		} else {
 			err = infra.GlobalOssBucketPji.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath)