|
@@ -6,9 +6,13 @@ import (
|
|
|
"cicv-data-closedloop/amd64/dispatch_server/package/global"
|
|
|
"cicv-data-closedloop/amd64/dispatch_server/package/infra"
|
|
|
"cicv-data-closedloop/amd64/dispatch_server/package/util"
|
|
|
+ "cicv-data-closedloop/common/config/c_log"
|
|
|
"encoding/json"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
+ "github.com/confluentinc/confluent-kafka-go/kafka"
|
|
|
+ "path/filepath"
|
|
|
+ "strconv"
|
|
|
"strings"
|
|
|
"time"
|
|
|
)
|
|
@@ -39,10 +43,11 @@ func RunWaitingUser() {
|
|
|
}
|
|
|
userId := taskCache.UserId
|
|
|
userParallelism := taskCache.UserParallelism
|
|
|
+ algorithmObjectKey := taskCache.AlgorithmObjectKey
|
|
|
task := taskCache.Task
|
|
|
// 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
|
|
|
if domain.CanRunUser(userId, userParallelism) { // 可以运行
|
|
|
- err = domain.AddWaitingCluster(userId, userParallelism, task)
|
|
|
+ err = domain.AddWaitingCluster(userId, userParallelism, algorithmObjectKey, task)
|
|
|
if err != nil {
|
|
|
infra.GlobalLogger.Error(err)
|
|
|
continue
|
|
@@ -71,10 +76,10 @@ func RunWaitingCluster() {
|
|
|
}
|
|
|
var firstTaskCache entity.TaskCache
|
|
|
if can {
|
|
|
- // 移除并取出
|
|
|
- firstTaskCacheJson, err := infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
|
|
|
+ // 取出但不移除
|
|
|
+ firstTaskCacheJson, err := infra.GlobalRedisClient.LIndex(global.KeyTaskQueueWaitingCluster, 0).Result()
|
|
|
if err != nil {
|
|
|
- infra.GlobalLogger.Error("移除并取出集群等待队列中的头元素报错,错误信息为:", err)
|
|
|
+ infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
|
|
|
continue
|
|
|
}
|
|
|
firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
|
|
@@ -89,10 +94,78 @@ func RunWaitingCluster() {
|
|
|
}
|
|
|
}
|
|
|
global.GpuNodeListMutex.Unlock()
|
|
|
+ // 获取项目ID
|
|
|
+ projectId := firstTaskCache.Task.Info.ProjectId
|
|
|
+ offsetKey := "offset:" + projectId
|
|
|
+ offset := 0
|
|
|
+ // 根据项目ID获取偏移量
|
|
|
+ val, err := infra.GlobalRedisClient.Get(offsetKey).Result()
|
|
|
+ if err != nil {
|
|
|
+ infra.GlobalLogger.Infof("偏移量键 %v 不存在,初始化设置为 0。", offsetKey)
|
|
|
+ err = infra.GlobalRedisClient.Set(offsetKey, 0, 0).Err()
|
|
|
+ if err != nil {
|
|
|
+ infra.GlobalLogger.Infof("偏移量键值对 %v 初始化失败,错误信息为: %v", offsetKey, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ offset, err = strconv.Atoi(val)
|
|
|
+ if err != nil {
|
|
|
+ infra.GlobalLogger.Infof("字符串 %v 转整数失败,错误信息为: %v", val, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 取出偏移量后将缓存中的加一,给下个任务使用。
|
|
|
+ _, err = infra.GlobalRedisClient.Incr(offsetKey).Result()
|
|
|
+ if err != nil {
|
|
|
+ infra.GlobalLogger.Infof("偏移量 %v 加一失败,错误信息为: %v", offsetKey, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
// --------------- 发送 kafka 消息(获取偏移量和分区) ---------------
|
|
|
+ // 获取任务消息转json
|
|
|
+ taskJson, err := TaskToJson(firstTaskCache.Task)
|
|
|
+ if err != nil {
|
|
|
+ infra.GlobalLogger.Error(err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ topic := projectId
|
|
|
+ value := []byte(taskJson)
|
|
|
|
|
|
+ // 创建一个Message,并指定分区为0
|
|
|
+ msg := &kafka.Message{
|
|
|
+ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0, Offset: kafka.Offset(offset)},
|
|
|
+ Value: value,
|
|
|
+ }
|
|
|
+ // 发送消息,并处理结果
|
|
|
+ err = infra.GlobalKafkaProducer.Produce(msg, nil)
|
|
|
+ if err != nil {
|
|
|
+ infra.GlobalLogger.Infof("发送任务消息 %v 失败,错误信息为: %v", msg, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ // --------------- 下载算法 ---------------
|
|
|
+ algorithmTarName := filepath.Base(firstTaskCache.AlgorithmObjectKey)
|
|
|
+ algorithmTarPath := infra.ApplicationYaml.K8s.AlgorithmTarTempDir + algorithmTarName
|
|
|
+ algorithmImageName := infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
|
|
|
+ err = infra.GlobalOssBucket.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
|
|
|
+ time.Sleep(time.Duration(2) * time.Second)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ // 导入算法
|
|
|
+ _, s, err := util.Execute("docker", "import", algorithmTarPath, algorithmImageName)
|
|
|
+ _, s, err = util.Execute("docker", "push", algorithmImageName)
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Errorf("导入算法镜像 %v 为 %v 失败,执行结果为:%v,错误信息为:%v", algorithmTarPath, algorithmImageName, s, err)
|
|
|
+ time.Sleep(time.Duration(2) * time.Second)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ c_log.GlobalLogger.Infof("导入算法镜像 %v 为 %v 成功,执行结果为:%v", algorithmTarPath, algorithmImageName, s)
|
|
|
+ err = util.RemoveFile(algorithmTarPath)
|
|
|
+ if err != nil {
|
|
|
+ c_log.GlobalLogger.Errorf("删除算法镜像文件 %v 失败,错误信息为:%v", algorithmTarPath, err)
|
|
|
+ }
|
|
|
// --------------- 启动 k8s pod ---------------
|
|
|
- projectId := firstTaskCache.Task.Info.ProjectId
|
|
|
nodeName := gpuNode.Hostname
|
|
|
// 1 生成 podName
|
|
|
podName := "project-" + projectId + "-" + util.NewShortUUID()
|
|
@@ -117,6 +190,13 @@ func RunWaitingCluster() {
|
|
|
podString = strings.Replace(podString, "kafka-partition", projectId, 1)
|
|
|
podString = strings.Replace(podString, "kafka-offset", projectId, 1)
|
|
|
// todo cpu编号是剩余并行度减一
|
|
|
+
|
|
|
+ // --------------- 移除头元素
|
|
|
+ _, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
|
|
|
+ if err != nil {
|
|
|
+ infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -131,3 +211,11 @@ func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
|
|
|
}
|
|
|
return taskCache, nil
|
|
|
}
|
|
|
+
|
|
|
+func TaskToJson(task entity.Task) (string, error) {
|
|
|
+ jsonData, err := json.MarshalIndent(task, "", " ")
|
|
|
+ if err != nil {
|
|
|
+ return "", errors.New("转json失败,错误信息为:" + err.Error())
|
|
|
+ }
|
|
|
+ return string(jsonData), nil
|
|
|
+}
|