LingxinMeng vor 1 Jahr
Ursprung
Commit
db7ba068b7

+ 3 - 0
amd64/dispatch_server/package/infra/i_application.go

@@ -1,6 +1,7 @@
 package infra
 
 import (
+	"cicv-data-closedloop/common/util"
 	_ "embed"
 	"fmt"
 	"gopkg.in/yaml.v2"
@@ -75,4 +76,6 @@ var (
 func InitApplication() {
 	_ = yaml.Unmarshal(applicationYamlBytes, &ApplicationYaml)
 	fmt.Println("加载配置文件内容为:", ApplicationYaml)
+	// 创建镜像下载目录
+	util.CreateDir(ApplicationYaml.K8s.AlgorithmTarTempDir)
 }

+ 25 - 16
amd64/dispatch_server/package/service/run_task.go

@@ -108,18 +108,21 @@ func RunWaitingCluster() {
 				}
 			}
 			// --------------- 下载算法 ---------------
-			infra.GlobalLogger.Infof("开始下载算法 %v。", firstTaskCache.AlgorithmObjectKey)
-			algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey)
-			algorithmTarPath = infra.ApplicationYaml.K8s.AlgorithmTarTempDir + algorithmTarName
-			algorithmImageName = infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
-			err = infra.GlobalOssBucket.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
-			if err != nil {
-				infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
-				time.Sleep(time.Duration(2) * time.Second)
-				global.GpuNodeListMutex.Unlock()
-				continue
+			{
+				infra.GlobalLogger.Infof("开始下载算法 %v。", firstTaskCache.AlgorithmObjectKey)
+				algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey)
+				algorithmTarPath = infra.ApplicationYaml.K8s.AlgorithmTarTempDir + util.NewShortUUID() + "/" + algorithmTarName
+				_ = util.CreateParentDir(algorithmTarPath)
+				algorithmImageName = infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
+				_ = infra.GlobalOssBucket.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
+				if err != nil {
+					infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", _)
+					time.Sleep(time.Duration(2) * time.Second)
+					global.GpuNodeListMutex.Unlock()
+					continue
+				}
+				infra.GlobalLogger.Infof("下载算法 %v 成功。", firstTaskCache.AlgorithmObjectKey)
 			}
-			infra.GlobalLogger.Infof("下载算法 %v 成功。", firstTaskCache.AlgorithmObjectKey)
 
 			err = domain.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
 			if err != nil {
@@ -247,12 +250,18 @@ func RunWaitingCluster() {
 			infra.GlobalLogger.Errorf("保存yaml字符串失败,执行结果为 %v,错误信息为 %v", s2, err)
 			continue
 		}
-		// --------------- 移除头元素
-		_, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
-		if err != nil {
-			infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
-			continue
+		// 收尾
+		{
+			// --------------- 移除头元素
+			_, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
+			if err != nil {
+				infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
+				continue
+			}
+			// --------------- 删除镜像文件
+			_ = util.RemoveFile(algorithmTarPath)
 		}
+
 	}
 }