|
@@ -66,6 +66,10 @@ func RunWaitingUser() {
|
|
func RunWaitingCluster() {
|
|
func RunWaitingCluster() {
|
|
infra.GlobalLogger.Infof("启动【集群等待队列】监控进程。")
|
|
infra.GlobalLogger.Infof("启动【集群等待队列】监控进程。")
|
|
for {
|
|
for {
|
|
|
|
+ var algorithmTarName string
|
|
|
|
+ var algorithmTarPath string
|
|
|
|
+ var algorithmImageName string
|
|
|
|
+
|
|
time.Sleep(2 * time.Second)
|
|
time.Sleep(2 * time.Second)
|
|
global.GpuNodeListMutex.Lock()
|
|
global.GpuNodeListMutex.Lock()
|
|
// 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
|
|
// 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
|
|
@@ -92,6 +96,18 @@ func RunWaitingCluster() {
|
|
infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
|
|
infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // --------------- 下载算法 ---------------
|
|
|
|
+ algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey)
|
|
|
|
+ algorithmTarPath = infra.ApplicationYaml.K8s.AlgorithmTarTempDir + algorithmTarName
|
|
|
|
+ algorithmImageName = infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
|
|
|
|
+ err = infra.GlobalOssBucket.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
|
|
|
|
+ if err != nil {
|
|
|
|
+ infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
|
|
|
|
+ time.Sleep(time.Duration(2) * time.Second)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
|
|
firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
|
|
if err != nil {
|
|
if err != nil {
|
|
infra.GlobalLogger.Error(err)
|
|
infra.GlobalLogger.Error(err)
|
|
@@ -111,6 +127,7 @@ func RunWaitingCluster() {
|
|
projectId := firstTaskCache.Task.Info.ProjectId
|
|
projectId := firstTaskCache.Task.Info.ProjectId
|
|
offsetKey := "offset:" + projectId
|
|
offsetKey := "offset:" + projectId
|
|
offset := 0
|
|
offset := 0
|
|
|
|
+
|
|
// 根据项目ID获取偏移量
|
|
// 根据项目ID获取偏移量
|
|
val, err := infra.GlobalRedisClient.Get(offsetKey).Result()
|
|
val, err := infra.GlobalRedisClient.Get(offsetKey).Result()
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -153,16 +170,7 @@ func RunWaitingCluster() {
|
|
infra.GlobalLogger.Infof("发送任务消息 %v 失败,错误信息为: %v", msg, err)
|
|
infra.GlobalLogger.Infof("发送任务消息 %v 失败,错误信息为: %v", msg, err)
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
- // --------------- 下载算法 ---------------
|
|
|
|
- algorithmTarName := filepath.Base(firstTaskCache.AlgorithmObjectKey)
|
|
|
|
- algorithmTarPath := infra.ApplicationYaml.K8s.AlgorithmTarTempDir + algorithmTarName
|
|
|
|
- algorithmImageName := infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
|
|
|
|
- err = infra.GlobalOssBucket.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
|
|
|
|
- time.Sleep(time.Duration(2) * time.Second)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
// 导入算法
|
|
// 导入算法
|
|
_, s, err := util.Execute("docker", "import", algorithmTarPath, algorithmImageName)
|
|
_, s, err := util.Execute("docker", "import", algorithmTarPath, algorithmImageName)
|
|
_, s, err = util.Execute("docker", "push", algorithmImageName)
|
|
_, s, err = util.Execute("docker", "push", algorithmImageName)
|