package service import ( "dcl_dispatch_server/src/package/domain" "dcl_dispatch_server/src/package/entity" "dcl_dispatch_server/src/package/global" "dcl_dispatch_server/src/package/infra" "dcl_dispatch_server/src/package/util" "encoding/json" "errors" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "path/filepath" "strconv" "strings" "time" ) /* 负责处理用户等待队列中的任务 负责运行集群等待队列中的任务 */ // RunWaitingUser 判断用户等待队列中的任务是否可以加入到集群等待队列 func RunWaitingUser() { infra.GlobalLogger.Infof("启动【用户等待队列】监控进程。") for { time.Sleep(2 * time.Second) global.RunTaskMutex.Lock() // 获取Redis列表中的值 taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result() if err != nil { infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err) continue } for _, taskCacheJson := range taskCacheJsons { taskCache, err := JsonToTaskCache(taskCacheJson) if err != nil { infra.GlobalLogger.Error(err) continue } userId := taskCache.UserId userParallelism := taskCache.UserParallelism algorithmObjectKey := taskCache.AlgorithmObjectKey equipmentType := taskCache.EquipmentType task := taskCache.Task // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动 if domain.CanRunUser(userId, userParallelism) { // 可以运行 err = domain.AddWaitingCluster(&entity.Project{ UserId: userId, Parallelism: userParallelism, AlgorithmObjectKey: algorithmObjectKey, EquipmentType: equipmentType, }, task) if err != nil { infra.GlobalLogger.Error(err) continue } err = domain.DeleteWaitingUser(task.Info.TaskId) if err != nil { infra.GlobalLogger.Error(err) continue } } } global.RunTaskMutex.Unlock() } } // 集群等待队列中的任务判断是否可以加入集群运行队列 func RunWaitingCluster() { infra.GlobalLogger.Infof("启动【集群等待队列】监控进程。") for { time.Sleep(2 * time.Second) global.GpuNodeListMutex.Lock() // 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动 can, gpuNode, err := domain.CanRunCluster() if err != nil { infra.GlobalLogger.Error(err) global.GpuNodeListMutex.Unlock() continue } firstTaskCache := entity.TaskCache{} algorithmTarName := "" algorithmTarPath := "" algorithmImageName := "" algorithmImageNameWithVersion := "" algorithmExist := false if can { // 判断是否有待运行的任务 waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result() if waitingClusterNumber == 0 { global.GpuNodeListMutex.Unlock() continue } else { infra.GlobalLogger.Infof("集群存在 %v 个等待运行的任务。", waitingClusterNumber) } // 取出并移出,20241017 防止报错后陷入死循环 { firstTaskCacheJson, err := infra.GlobalRedisClient.LIndex(global.KeyTaskQueueWaitingCluster, 0).Result() if err != nil { infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err) global.GpuNodeListMutex.Unlock() continue } firstTaskCache, err = JsonToTaskCache(firstTaskCacheJson) if err != nil { infra.GlobalLogger.Error(err) global.GpuNodeListMutex.Unlock() continue } // --------------- 从等待队列中移除 if _, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result(); err != nil { infra.GlobalLogger.Error(err) continue } } // --------------- 下载算法 --------------- { infra.GlobalLogger.Infof("开始下载算法 %v。", firstTaskCache.AlgorithmObjectKey) algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey) algorithmTarPath = infra.ApplicationYaml.K8s.AlgorithmTarTempDir + util.NewShortUUID() + "/" + algorithmTarName _ = util.CreateParentDir(algorithmTarPath) algorithmImageName = infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName) algorithmImageNameWithVersion = algorithmImageName + ":latest" algorithmExist = util.ImageExists(infra.GlobalDockerClient, algorithmImageName) if !algorithmExist { if firstTaskCache.Env == "cicv" { err = infra.GlobalOssBucketCicv.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath) } else { err = infra.GlobalOssBucketPji.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath) } if err != nil { infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err) time.Sleep(time.Duration(2) * time.Second) global.GpuNodeListMutex.Unlock() continue } infra.GlobalLogger.Infof("下载算法 %v 成功。", firstTaskCache.AlgorithmObjectKey) } else { infra.GlobalLogger.Infof("算法 %v 已存在。", algorithmImageName) } } } else { infra.GlobalLogger.Infof("集群没有剩余并行度。") global.GpuNodeListMutex.Unlock() continue } global.GpuNodeListMutex.Unlock() // 获取项目ID projectId := firstTaskCache.Task.Info.ProjectId offsetKey := "offset:" + projectId offset := 0 // 根据项目ID获取偏移量 val, err := infra.GlobalRedisClient.Get(offsetKey).Result() if err != nil { infra.GlobalLogger.Infof("偏移量键 %v 不存在,初始化设置为 0。", offsetKey) err = infra.GlobalRedisClient.Set(offsetKey, 0, 0).Err() if err != nil { infra.GlobalLogger.Infof("偏移量键值对 %v 初始化失败,错误信息为: %v", offsetKey, err) continue } } else { offset, err = strconv.Atoi(val) if err != nil { infra.GlobalLogger.Infof("字符串 %v 转整数失败,错误信息为: %v", val, err) continue } infra.GlobalLogger.Infof("当前任务使用偏移量【%v】", offset) } // 取出偏移量后将缓存中的加一,给下个任务使用。 _, err = infra.GlobalRedisClient.Incr(offsetKey).Result() if err != nil { infra.GlobalLogger.Infof("偏移量 %v 加一失败,错误信息为: %v", offsetKey, err) continue } infra.GlobalLogger.Infof("偏移量【%v】加一给下个任务使用。", offsetKey) // --------------- 发送 kafka 消息(获取偏移量和分区) --------------- // 获取任务消息转json // 将摄像头写死一个参数 firstTaskCache.Task.Vehicle.Sensors.Camera = []entity.SensorCamera{ { Sensor: entity.Sensor{ SensorName: "custom_camera", SensorNear: 1.0, SensorFar: 1500.0, SensorX: -15000.0, SensorY: 0.0, SensorZ: 5000.0, SensorH: 0.0, SensorP: 10.0, SensorR: 0.0, }, SensorForH: 45, SensorForV: 27, SensorResolution: "480*270", SensorFrameRate: 25, }, } taskJson, err := TaskToJson(firstTaskCache.Task) if err != nil { infra.GlobalLogger.Error(err) continue } topic := projectId // 创建一个Message,并指定分区为0 msg := &kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: infra.ApplicationYaml.Kafka.Partition, Offset: kafka.Offset(offset)}, Value: []byte(taskJson), } // 发送消息,并处理结果 err = infra.GlobalKafkaProducer.Produce(msg, nil) if err != nil { infra.GlobalLogger.Infof("发送任务消息 %v 失败,错误信息为: %v", msg, err) continue } infra.GlobalLogger.Infof("发送任务消息成功,话题为【%v】,偏移量为【%v】。", topic, offset) // 如果新算法需要导入 if !algorithmExist { // 导入算法 infra.GlobalLogger.Infof("导入算法文件【%v】到docker镜像【%v】。", algorithmTarPath, algorithmImageNameWithVersion) _, s, err := util.Execute("docker", "import", algorithmTarPath, algorithmImageNameWithVersion) infra.GlobalLogger.Infof("推送算法镜像【%v】。", algorithmImageNameWithVersion) _, s, err = util.Execute("docker", "push", algorithmImageNameWithVersion) if err != nil { infra.GlobalLogger.Errorf("导入算法镜像 %v 为 %v 失败,执行结果为:%v,错误信息为:%v", algorithmTarPath, algorithmImageNameWithVersion, s, err) time.Sleep(time.Duration(2) * time.Second) continue } infra.GlobalLogger.Infof("导入算法镜像 %v 为 %v 成功,执行结果为:%v", algorithmTarPath, algorithmImageNameWithVersion, s) err = util.RemoveFile(algorithmTarPath) if err != nil { infra.GlobalLogger.Errorf("删除算法镜像文件 %v 失败,错误信息为:%v", algorithmTarPath, err) } } // --------------- 启动 k8s pod --------------- podName := "project-" + projectId + "-" + util.NewShortUUID() namespaceName := infra.ApplicationYaml.K8s.NamespaceName nodeName := gpuNode.Hostname restParallelism := gpuNode.Parallelism vtdContainer := "vtd-" + projectId algorithmContainer := "algorithm-" + projectId vtdImage := infra.ApplicationYaml.K8s.VtdImage // 2 生成模板文件名称 podYaml := nodeName + "#" + podName + ".yaml" // 3 模板yaml存储路径 yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml // 4 模板yaml备份路径 yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml fmt.Println(yamlPath, yamlPathBak) // 5 podString := "" if firstTaskCache.EquipmentType == "JIN_LONG_BA_SHI" || firstTaskCache.EquipmentType == "PU_JIN_DUO_GONG_NENG_CHE" { // 多功能车仿真 if podString, err = util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYamlPjisuv); err != nil { infra.GlobalLogger.Error(err) continue } podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommandPjisuv, -1) } else { if podString, err = util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYamlPjibot); err != nil { infra.GlobalLogger.Error(err) continue } podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommandPjibot, -1) } if firstTaskCache.Env == "cicv" { podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.OssCicv.Type, -1) podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.OssCicv.Endpoint, -1) // 不带http://前缀 podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.OssCicv.AccessKeyId, -1) podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.OssCicv.AccessKeySecret, -1) podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.OssCicv.BucketName, -1) } else { podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.OssPji.Type, -1) podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.OssPji.Endpoint, -1) // 不带http://前缀 podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.OssPji.AccessKeyId, -1) podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.OssPji.AccessKeySecret, -1) podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.OssPji.BucketName, -1) } podString = strings.Replace(podString, "pod-name", podName, -1) podString = strings.Replace(podString, "namespace-name", namespaceName, -1) podString = strings.Replace(podString, "node-name", nodeName, -1) podString = strings.Replace(podString, "algorithm-image", algorithmImageNameWithVersion, -1) podString = strings.Replace(podString, "vtd-container", vtdContainer, -1) podString = strings.Replace(podString, "vtd-image", vtdImage, -1) podString = strings.Replace(podString, "platform-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1) podString = strings.Replace(podString, "simulation-cloud-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1) podString = strings.Replace(podString, "platform-type", "\""+infra.ApplicationYaml.K8s.PlatformType+"\"", -1) podString = strings.Replace(podString, "kafka-ip", infra.ApplicationYaml.Kafka.Broker, -1) podString = strings.Replace(podString, "kafka-topic", projectId, -1) podString = strings.Replace(podString, "kafka-partition", "\""+util.ToString(infra.ApplicationYaml.Kafka.Partition)+"\"", -1) podString = strings.Replace(podString, "kafka-offset", "\""+util.ToString(offset)+"\"", -1) podString = strings.Replace(podString, "cpu-order", "\""+util.ToString(restParallelism-1)+"\"", -1) // cpu编号是剩余并行度-1 podString = strings.Replace(podString, "algorithm-container", algorithmContainer, -1) // 从xosc中解析起终点位置 xoscOssPath := firstTaskCache.Task.Scenario.ScenarioOsc tempDir := "/mnt/disk001/dcl_dispatch_server/temp/" util.CreateDir(tempDir) xoscLocalPath := tempDir + util.NewShortUUID() + ".xosc" if firstTaskCache.Env == "cicv" { // cicv 或 pji err = infra.GlobalOssBucketCicv.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath) } else { err = infra.GlobalOssBucketPji.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath) } if err != nil { infra.GlobalLogger.Errorf("下载xosc文件【%v】失败,错误信息为:%v", xoscOssPath, err) continue } { s1, s2, s3, s4 := util.GetStartAndEnd(xoscLocalPath) podString = strings.Replace(podString, "start-position-x", "\""+s1+"\"", -1) podString = strings.Replace(podString, "start-position-y", "\""+s2+"\"", -1) podString = strings.Replace(podString, "end-position-x", "\""+s3+"\"", -1) podString = strings.Replace(podString, "end-position-y", "\""+s4+"\"", -1) } // --------------- 保存成文件 err = util.WriteFile(podString, yamlPath) err = util.WriteFile(podString, yamlPathBak) if err != nil { infra.GlobalLogger.Error("保存yaml字符串失败,错误信息为", err) continue } infra.GlobalLogger.Infof("保存yaml文件到执行路径【%v】和备份路径【%v】", yamlPath, yamlPathBak) // --------------- 启动 pod _, s2, err := util.Execute("kubectl", "apply", "-f", yamlPath) if err != nil { infra.GlobalLogger.Errorf("启动pod失败,执行结果为 %v,错误信息为 %v", s2, err) continue } infra.GlobalLogger.Errorf("启动pod成功,执行结果为 %v。", s2) // 收尾 { // --------------- 添加到运行队列 err = domain.AddRunningCluster(firstTaskCache, gpuNode.Hostname) if err != nil { infra.GlobalLogger.Error(err) global.GpuNodeListMutex.Unlock() continue } // --------------- 删除镜像文件 _ = util.RemoveFile(algorithmTarPath) } } } func JsonToTaskCache(jsonData string) (entity.TaskCache, error) { // 创建一个 Person 类型的变量 var taskCache entity.TaskCache // 使用 json.Unmarshal 解析 JSON 字符串到结构体 err := json.Unmarshal([]byte(jsonData), &taskCache) if err != nil { return entity.TaskCache{}, errors.New("对象json " + jsonData + " 转对象失败错误信息为: " + fmt.Sprintf("%v", err)) } return taskCache, nil } func TaskToJson(task entity.Task) (string, error) { jsonData, err := json.Marshal(task) if err != nil { return "", errors.New("转json失败,错误信息为:" + err.Error()) } return string(jsonData), nil }