|
@@ -76,6 +76,7 @@ func RunWaitingCluster() {
|
|
can, gpuNode, err := domain.CanRunCluster()
|
|
can, gpuNode, err := domain.CanRunCluster()
|
|
if err != nil {
|
|
if err != nil {
|
|
infra.GlobalLogger.Error(err)
|
|
infra.GlobalLogger.Error(err)
|
|
|
|
+ global.GpuNodeListMutex.Unlock()
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
var firstTaskCache entity.TaskCache
|
|
var firstTaskCache entity.TaskCache
|
|
@@ -85,6 +86,7 @@ func RunWaitingCluster() {
|
|
waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result()
|
|
waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result()
|
|
if waitingClusterNumber == 0 {
|
|
if waitingClusterNumber == 0 {
|
|
infra.GlobalLogger.Info("集群没有等待运行的任务。")
|
|
infra.GlobalLogger.Info("集群没有等待运行的任务。")
|
|
|
|
+ global.GpuNodeListMutex.Unlock()
|
|
continue
|
|
continue
|
|
} else {
|
|
} else {
|
|
infra.GlobalLogger.Infof("集群存在 %v 个等待运行的任务。", waitingClusterNumber)
|
|
infra.GlobalLogger.Infof("集群存在 %v 个等待运行的任务。", waitingClusterNumber)
|
|
@@ -94,6 +96,7 @@ func RunWaitingCluster() {
|
|
firstTaskCacheJson, err := infra.GlobalRedisClient.LIndex(global.KeyTaskQueueWaitingCluster, 0).Result()
|
|
firstTaskCacheJson, err := infra.GlobalRedisClient.LIndex(global.KeyTaskQueueWaitingCluster, 0).Result()
|
|
if err != nil {
|
|
if err != nil {
|
|
infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
|
|
infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
|
|
|
|
+ global.GpuNodeListMutex.Unlock()
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
|
|
|
|
@@ -106,6 +109,7 @@ func RunWaitingCluster() {
|
|
if err != nil {
|
|
if err != nil {
|
|
infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
|
|
infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
|
|
time.Sleep(time.Duration(2) * time.Second)
|
|
time.Sleep(time.Duration(2) * time.Second)
|
|
|
|
+ global.GpuNodeListMutex.Unlock()
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
infra.GlobalLogger.Infof("下载算法 %v 成功。", firstTaskCache.AlgorithmObjectKey)
|
|
infra.GlobalLogger.Infof("下载算法 %v 成功。", firstTaskCache.AlgorithmObjectKey)
|
|
@@ -113,15 +117,18 @@ func RunWaitingCluster() {
|
|
firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
|
|
firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
|
|
if err != nil {
|
|
if err != nil {
|
|
infra.GlobalLogger.Error(err)
|
|
infra.GlobalLogger.Error(err)
|
|
|
|
+ global.GpuNodeListMutex.Unlock()
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
err = domain.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
|
|
err = domain.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
|
|
if err != nil {
|
|
if err != nil {
|
|
infra.GlobalLogger.Error(err)
|
|
infra.GlobalLogger.Error(err)
|
|
|
|
+ global.GpuNodeListMutex.Unlock()
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
infra.GlobalLogger.Infof("集群没有剩余并行度。")
|
|
infra.GlobalLogger.Infof("集群没有剩余并行度。")
|
|
|
|
+ global.GpuNodeListMutex.Unlock()
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
global.GpuNodeListMutex.Unlock()
|
|
global.GpuNodeListMutex.Unlock()
|