|
@@ -1,314 +0,0 @@
|
|
-package service
|
|
|
|
-
|
|
|
|
-import (
|
|
|
|
- "cicv-data-closedloop/amd64/dispatch_server/package/domain"
|
|
|
|
- "cicv-data-closedloop/amd64/dispatch_server/package/entity"
|
|
|
|
- "cicv-data-closedloop/amd64/dispatch_server/package/global"
|
|
|
|
- "cicv-data-closedloop/amd64/dispatch_server/package/infra"
|
|
|
|
- "cicv-data-closedloop/amd64/dispatch_server/package/util"
|
|
|
|
- "encoding/json"
|
|
|
|
- "errors"
|
|
|
|
- "fmt"
|
|
|
|
- "github.com/confluentinc/confluent-kafka-go/kafka"
|
|
|
|
- "path/filepath"
|
|
|
|
- "strconv"
|
|
|
|
- "strings"
|
|
|
|
- "time"
|
|
|
|
-)
|
|
|
|
-
|
|
|
|
-/*
|
|
|
|
- 负责处理用户等待队列中的任务
|
|
|
|
- 负责运行集群等待队列中的任务
|
|
|
|
-*/
|
|
|
|
-
|
|
|
|
-// 判断用户等待队列中的任务是否可以加入到集群等待队列
|
|
|
|
-func RunWaitingUser() {
|
|
|
|
- infra.GlobalLogger.Infof("启动【用户等待队列】监控进程。")
|
|
|
|
- for {
|
|
|
|
- time.Sleep(2 * time.Second)
|
|
|
|
- global.RunTaskMutex.Lock()
|
|
|
|
- // 获取Redis列表中的值
|
|
|
|
- taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- for _, taskCacheJson := range taskCacheJsons {
|
|
|
|
- taskCache, err := JsonToTaskCache(taskCacheJson)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error(err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- userId := taskCache.UserId
|
|
|
|
- userParallelism := taskCache.UserParallelism
|
|
|
|
- algorithmObjectKey := taskCache.AlgorithmObjectKey
|
|
|
|
- task := taskCache.Task
|
|
|
|
- // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
|
|
|
|
- if domain.CanRunUser(userId, userParallelism) { // 可以运行
|
|
|
|
- err = domain.AddWaitingCluster(userId, userParallelism, algorithmObjectKey, task)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error(err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- err = domain.DeleteWaitingUser(task.Info.TaskId)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error(err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- global.RunTaskMutex.Unlock()
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// 集群等待队列中的任务判断是否可以加入集群运行队列
|
|
|
|
-func RunWaitingCluster() {
|
|
|
|
- infra.GlobalLogger.Infof("启动【集群等待队列】监控进程。")
|
|
|
|
- for {
|
|
|
|
- var algorithmTarName string
|
|
|
|
- var algorithmTarPath string
|
|
|
|
- var algorithmImageName string
|
|
|
|
-
|
|
|
|
- time.Sleep(2 * time.Second)
|
|
|
|
- global.GpuNodeListMutex.Lock()
|
|
|
|
- // 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
|
|
|
|
- can, gpuNode, err := domain.CanRunCluster()
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error(err)
|
|
|
|
- global.GpuNodeListMutex.Unlock()
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- var firstTaskCache entity.TaskCache
|
|
|
|
- if can {
|
|
|
|
- //infra.GlobalLogger.Infof("节点 %v 有剩余并行度。", gpuNode)
|
|
|
|
- // 判断是否有待运行的任务
|
|
|
|
- waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result()
|
|
|
|
- if waitingClusterNumber == 0 {
|
|
|
|
- //infra.GlobalLogger.Info("集群没有等待运行的任务。")
|
|
|
|
- global.GpuNodeListMutex.Unlock()
|
|
|
|
- continue
|
|
|
|
- } else {
|
|
|
|
- infra.GlobalLogger.Infof("集群存在 %v 个等待运行的任务。", waitingClusterNumber)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // 取出但不移除
|
|
|
|
- {
|
|
|
|
- firstTaskCacheJson, err := infra.GlobalRedisClient.LIndex(global.KeyTaskQueueWaitingCluster, 0).Result()
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
|
|
|
|
- global.GpuNodeListMutex.Unlock()
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error(err)
|
|
|
|
- global.GpuNodeListMutex.Unlock()
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- // --------------- 下载算法 ---------------
|
|
|
|
- {
|
|
|
|
- infra.GlobalLogger.Infof("开始下载算法 %v。", firstTaskCache.AlgorithmObjectKey)
|
|
|
|
- algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey)
|
|
|
|
- algorithmTarPath = infra.ApplicationYaml.K8s.AlgorithmTarTempDir + util.NewShortUUID() + "/" + algorithmTarName
|
|
|
|
- _ = util.CreateParentDir(algorithmTarPath)
|
|
|
|
- algorithmImageName = infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
|
|
|
|
- _ = infra.GlobalOssBucket.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
|
|
|
|
- time.Sleep(time.Duration(2) * time.Second)
|
|
|
|
- global.GpuNodeListMutex.Unlock()
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- infra.GlobalLogger.Infof("下载算法 %v 成功。", firstTaskCache.AlgorithmObjectKey)
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- infra.GlobalLogger.Infof("集群没有剩余并行度。")
|
|
|
|
- global.GpuNodeListMutex.Unlock()
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- global.GpuNodeListMutex.Unlock()
|
|
|
|
- // 获取项目ID
|
|
|
|
- projectId := firstTaskCache.Task.Info.ProjectId
|
|
|
|
- offsetKey := "offset:" + projectId
|
|
|
|
- offset := 0
|
|
|
|
-
|
|
|
|
- // 根据项目ID获取偏移量
|
|
|
|
- val, err := infra.GlobalRedisClient.Get(offsetKey).Result()
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Infof("偏移量键 %v 不存在,初始化设置为 0。", offsetKey)
|
|
|
|
- err = infra.GlobalRedisClient.Set(offsetKey, 0, 0).Err()
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Infof("偏移量键值对 %v 初始化失败,错误信息为: %v", offsetKey, err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- offset, err = strconv.Atoi(val)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Infof("字符串 %v 转整数失败,错误信息为: %v", val, err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- infra.GlobalLogger.Infof("当前任务使用偏移量【%v】", offset)
|
|
|
|
- }
|
|
|
|
- // 取出偏移量后将缓存中的加一,给下个任务使用。
|
|
|
|
- _, err = infra.GlobalRedisClient.Incr(offsetKey).Result()
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Infof("偏移量 %v 加一失败,错误信息为: %v", offsetKey, err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- infra.GlobalLogger.Infof("偏移量【%v】加一给下个任务使用。", offsetKey)
|
|
|
|
-
|
|
|
|
- // --------------- 发送 kafka 消息(获取偏移量和分区) ---------------
|
|
|
|
- // 获取任务消息转json
|
|
|
|
- // todo 将摄像头写死一个参数
|
|
|
|
- firstTaskCache.Task.Vehicle.Sensors.Camera = []entity.SensorCamera{
|
|
|
|
- {
|
|
|
|
- Sensor: entity.Sensor{
|
|
|
|
- SensorName: "custom_camera",
|
|
|
|
- SensorNear: 1.0,
|
|
|
|
- SensorFar: 1500.0,
|
|
|
|
- SensorX: -15000.0,
|
|
|
|
- SensorY: 0.0,
|
|
|
|
- SensorZ: 5000.0,
|
|
|
|
- SensorH: 0.0,
|
|
|
|
- SensorP: 10.0,
|
|
|
|
- SensorR: 0.0,
|
|
|
|
- },
|
|
|
|
- SensorForH: 45,
|
|
|
|
- SensorForV: 27,
|
|
|
|
- SensorResolution: "480*270",
|
|
|
|
- SensorFrameRate: 25,
|
|
|
|
- },
|
|
|
|
- }
|
|
|
|
- taskJson, err := TaskToJson(firstTaskCache.Task)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error(err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- topic := projectId
|
|
|
|
- // 创建一个Message,并指定分区为0
|
|
|
|
- msg := &kafka.Message{
|
|
|
|
- TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: infra.ApplicationYaml.Kafka.Partition, Offset: kafka.Offset(offset)},
|
|
|
|
- Value: []byte(taskJson),
|
|
|
|
- }
|
|
|
|
- // 发送消息,并处理结果
|
|
|
|
- err = infra.GlobalKafkaProducer.Produce(msg, nil)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Infof("发送任务消息 %v 失败,错误信息为: %v", msg, err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- infra.GlobalLogger.Infof("发送任务消息成功,话题为【%v】,偏移量为【%v】。", topic, offset)
|
|
|
|
-
|
|
|
|
- // 导入算法
|
|
|
|
- _, s, err := util.Execute("docker", "import", algorithmTarPath, algorithmImageName)
|
|
|
|
- _, s, err = util.Execute("docker", "push", algorithmImageName)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Errorf("导入算法镜像 %v 为 %v 失败,执行结果为:%v,错误信息为:%v", algorithmTarPath, algorithmImageName, s, err)
|
|
|
|
- time.Sleep(time.Duration(2) * time.Second)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- infra.GlobalLogger.Infof("导入算法镜像 %v 为 %v 成功,执行结果为:%v", algorithmTarPath, algorithmImageName, s)
|
|
|
|
- err = util.RemoveFile(algorithmTarPath)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Errorf("删除算法镜像文件 %v 失败,错误信息为:%v", algorithmTarPath, err)
|
|
|
|
- }
|
|
|
|
- // --------------- 启动 k8s pod ---------------
|
|
|
|
- podName := "project-" + projectId + "-" + util.NewShortUUID()
|
|
|
|
- namespaceName := infra.ApplicationYaml.K8s.NamespaceName
|
|
|
|
- nodeName := gpuNode.Hostname
|
|
|
|
- restParallelism := gpuNode.Parallelism
|
|
|
|
- vtdContainer := "vtd-" + projectId
|
|
|
|
- algorithmContainer := "algorithm-" + projectId
|
|
|
|
- vtdImage := infra.ApplicationYaml.K8s.VtdImage
|
|
|
|
- // 2 生成模板文件名称
|
|
|
|
- podYaml := nodeName + "#" + podName + ".yaml"
|
|
|
|
- // 3 模板yaml存储路径
|
|
|
|
- yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml
|
|
|
|
- // 4 模板yaml备份路径
|
|
|
|
- yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml
|
|
|
|
- fmt.Println(yamlPath, yamlPathBak)
|
|
|
|
- // 5
|
|
|
|
- podString, err := util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYaml)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error(err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- podString = strings.Replace(podString, "pod-name", podName, -1)
|
|
|
|
- podString = strings.Replace(podString, "namespace-name", namespaceName, -1)
|
|
|
|
- podString = strings.Replace(podString, "node-name", nodeName, -1)
|
|
|
|
- podString = strings.Replace(podString, "algorithm-image", algorithmImageName, -1)
|
|
|
|
- podString = strings.Replace(podString, "vtd-container", vtdContainer, -1)
|
|
|
|
- podString = strings.Replace(podString, "vtd-image", vtdImage, -1)
|
|
|
|
- podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommand, -1)
|
|
|
|
- podString = strings.Replace(podString, "platform-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1)
|
|
|
|
- podString = strings.Replace(podString, "simulation-cloud-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1)
|
|
|
|
- podString = strings.Replace(podString, "platform-type", "\""+infra.ApplicationYaml.K8s.PlatformType+"\"", -1)
|
|
|
|
- podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.Oss.Type, -1)
|
|
|
|
- podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.Oss.Endpoint, -1) // 不带http://前缀
|
|
|
|
- podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.Oss.AccessKeyId, -1)
|
|
|
|
- podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.Oss.AccessKeySecret, -1)
|
|
|
|
- podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.Oss.BucketName, -1)
|
|
|
|
- podString = strings.Replace(podString, "kafka-ip", infra.ApplicationYaml.Kafka.Broker, -1)
|
|
|
|
- podString = strings.Replace(podString, "kafka-topic", projectId, -1)
|
|
|
|
- podString = strings.Replace(podString, "kafka-partition", "\""+util.ToString(infra.ApplicationYaml.Kafka.Partition)+"\"", -1)
|
|
|
|
- podString = strings.Replace(podString, "kafka-offset", "\""+util.ToString(offset)+"\"", -1)
|
|
|
|
- podString = strings.Replace(podString, "cpu-order", "\""+util.ToString(restParallelism-1)+"\"", -1) // cpu编号是剩余并行度-1
|
|
|
|
- podString = strings.Replace(podString, "algorithm-container", algorithmContainer, -1)
|
|
|
|
-
|
|
|
|
- // --------------- 保存成文件
|
|
|
|
- err = util.WriteFile(podString, yamlPath)
|
|
|
|
- err = util.WriteFile(podString, yamlPathBak)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error("保存yaml字符串失败,错误信息为", err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- infra.GlobalLogger.Infof("保存yaml文件到执行路径【%v】和备份路径【%v】", yamlPath, yamlPathBak)
|
|
|
|
- // --------------- 启动 pod
|
|
|
|
- _, s2, err := util.Execute("kubectl", "apply", "-f", yamlPath)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Errorf("启动pod失败,执行结果为 %v,错误信息为 %v", s2, err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- infra.GlobalLogger.Errorf("启动pod成功,执行结果为 %v。", s2)
|
|
|
|
- // 收尾
|
|
|
|
- {
|
|
|
|
- // --------------- 添加到运行队列
|
|
|
|
- err = domain.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error(err)
|
|
|
|
- global.GpuNodeListMutex.Unlock()
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- // --------------- 从等待队列中移除
|
|
|
|
- _, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result()
|
|
|
|
- if err != nil {
|
|
|
|
- infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- // --------------- 删除镜像文件
|
|
|
|
- _ = util.RemoveFile(algorithmTarPath)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func JsonToTaskCache(jsonData string) (entity.TaskCache, error) {
|
|
|
|
- // 创建一个 Person 类型的变量
|
|
|
|
- var taskCache entity.TaskCache
|
|
|
|
-
|
|
|
|
- // 使用 json.Unmarshal 解析 JSON 字符串到结构体
|
|
|
|
- err := json.Unmarshal([]byte(jsonData), &taskCache)
|
|
|
|
- if err != nil {
|
|
|
|
- return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err))
|
|
|
|
- }
|
|
|
|
- return taskCache, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func TaskToJson(task entity.Task) (string, error) {
|
|
|
|
- jsonData, err := json.Marshal(task)
|
|
|
|
- if err != nil {
|
|
|
|
- return "", errors.New("转json失败,错误信息为:" + err.Error())
|
|
|
|
- }
|
|
|
|
- return string(jsonData), nil
|
|
|
|
-}
|
|
|