|
@@ -68,7 +68,7 @@ func RunWaitingUser() {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-// RunWaitingCluster 集群等待队列中的任务判断是否可以加入集群运行队列
|
|
|
|
|
|
+// 集群等待队列中的任务判断是否可以加入集群运行队列
|
|
func RunWaitingCluster() {
|
|
func RunWaitingCluster() {
|
|
infra.GlobalLogger.Infof("启动【集群等待队列】监控进程。")
|
|
infra.GlobalLogger.Infof("启动【集群等待队列】监控进程。")
|
|
for {
|
|
for {
|
|
@@ -90,11 +90,9 @@ func RunWaitingCluster() {
|
|
algorithmExist := false
|
|
algorithmExist := false
|
|
|
|
|
|
if can {
|
|
if can {
|
|
- //infra.GlobalLogger.Infof("节点 %v 有剩余并行度。", gpuNode)
|
|
|
|
// 判断是否有待运行的任务
|
|
// 判断是否有待运行的任务
|
|
waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result()
|
|
waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result()
|
|
if waitingClusterNumber == 0 {
|
|
if waitingClusterNumber == 0 {
|
|
- //infra.GlobalLogger.Info("集群没有等待运行的任务。")
|
|
|
|
global.GpuNodeListMutex.Unlock()
|
|
global.GpuNodeListMutex.Unlock()
|
|
continue
|
|
continue
|
|
} else {
|
|
} else {
|
|
@@ -121,7 +119,7 @@ func RunWaitingCluster() {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- // --------------- 下载算法 --------------- todo 算法这里需要控制已经下载过的算法就不要再次下载了
|
|
|
|
|
|
+ // --------------- 下载算法 ---------------
|
|
{
|
|
{
|
|
infra.GlobalLogger.Infof("开始下载算法 %v。", firstTaskCache.AlgorithmObjectKey)
|
|
infra.GlobalLogger.Infof("开始下载算法 %v。", firstTaskCache.AlgorithmObjectKey)
|
|
algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey)
|
|
algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey)
|
|
@@ -131,7 +129,7 @@ func RunWaitingCluster() {
|
|
algorithmImageNameWithVersion = algorithmImageName + ":latest"
|
|
algorithmImageNameWithVersion = algorithmImageName + ":latest"
|
|
algorithmExist = util.ImageExists(infra.GlobalDockerClient, algorithmImageName)
|
|
algorithmExist = util.ImageExists(infra.GlobalDockerClient, algorithmImageName)
|
|
if !algorithmExist {
|
|
if !algorithmExist {
|
|
- if firstTaskCache.EquipmentType == "JIN_LONG_BA_SHI" || firstTaskCache.EquipmentType == "PU_JIN_DUO_GONG_NENG_CHE" {
|
|
|
|
|
|
+ if firstTaskCache.Env == "cicv" {
|
|
err = infra.GlobalOssBucketCicv.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
|
|
err = infra.GlobalOssBucketCicv.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
|
|
} else {
|
|
} else {
|
|
err = infra.GlobalOssBucketPji.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
|
|
err = infra.GlobalOssBucketPji.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
|
|
@@ -266,17 +264,20 @@ func RunWaitingCluster() {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommandPjisuv, -1)
|
|
podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommandPjisuv, -1)
|
|
- podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.OssCicv.Type, -1)
|
|
|
|
- podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.OssCicv.Endpoint, -1) // 不带http://前缀
|
|
|
|
- podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.OssCicv.AccessKeyId, -1)
|
|
|
|
- podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.OssCicv.AccessKeySecret, -1)
|
|
|
|
- podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.OssCicv.BucketName, -1)
|
|
|
|
} else {
|
|
} else {
|
|
if podString, err = util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYamlPjibot); err != nil {
|
|
if podString, err = util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYamlPjibot); err != nil {
|
|
infra.GlobalLogger.Error(err)
|
|
infra.GlobalLogger.Error(err)
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommandPjibot, -1)
|
|
podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommandPjibot, -1)
|
|
|
|
+ }
|
|
|
|
+ if firstTaskCache.Env == "cicv" {
|
|
|
|
+ podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.OssCicv.Type, -1)
|
|
|
|
+ podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.OssCicv.Endpoint, -1) // 不带http://前缀
|
|
|
|
+ podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.OssCicv.AccessKeyId, -1)
|
|
|
|
+ podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.OssCicv.AccessKeySecret, -1)
|
|
|
|
+ podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.OssCicv.BucketName, -1)
|
|
|
|
+ } else {
|
|
podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.OssPji.Type, -1)
|
|
podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.OssPji.Type, -1)
|
|
podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.OssPji.Endpoint, -1) // 不带http://前缀
|
|
podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.OssPji.Endpoint, -1) // 不带http://前缀
|
|
podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.OssPji.AccessKeyId, -1)
|
|
podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.OssPji.AccessKeyId, -1)
|
|
@@ -303,7 +304,7 @@ func RunWaitingCluster() {
|
|
tempDir := "/mnt/disk001/dcl_dispatch_server/temp/"
|
|
tempDir := "/mnt/disk001/dcl_dispatch_server/temp/"
|
|
util.CreateDir(tempDir)
|
|
util.CreateDir(tempDir)
|
|
xoscLocalPath := tempDir + util.NewShortUUID() + ".xosc"
|
|
xoscLocalPath := tempDir + util.NewShortUUID() + ".xosc"
|
|
- if firstTaskCache.EquipmentType == "JIN_LONG_BA_SHI" || firstTaskCache.EquipmentType == "PU_JIN_DUO_GONG_NENG_CHE" {
|
|
|
|
|
|
+ if firstTaskCache.Env == "cicv" { // cicv 或 pji
|
|
err = infra.GlobalOssBucketCicv.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath)
|
|
err = infra.GlobalOssBucketCicv.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath)
|
|
} else {
|
|
} else {
|
|
err = infra.GlobalOssBucketPji.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath)
|
|
err = infra.GlobalOssBucketPji.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath)
|