123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- package application
- import (
- "dcl_dispatch_server/src/package/domain/project"
- "dcl_dispatch_server/src/package/domain/task_cache"
- "dcl_dispatch_server/src/package/infra"
- "dcl_dispatch_server/src/package/infra/global"
- "dcl_dispatch_server/src/package/infra/redis"
- "dcl_dispatch_server/src/package/util"
- "fmt"
- "github.com/aliyun/aliyun-oss-go-sdk/oss"
- "path/filepath"
- "strconv"
- "strings"
- "time"
- "github.com/confluentinc/confluent-kafka-go/kafka"
- )
- /*
- 负责处理用户等待队列中的任务
- 负责运行集群等待队列中的任务
- */
- // 判断用户等待队列中的任务是否可以加入到集群等待队列
- func RunWaitingUser() {
- infra.GlobalLogger.Infof("启动【用户等待队列】监控进程。")
- for {
- time.Sleep(2 * time.Second)
- global.RunTaskMutex.Lock()
- // 获取Redis列表中的值
- taskCacheJsons, err := infra.GlobalRedisClient.LRange(global.KeyTaskQueueWaitingUser, 0, -1).Result()
- if err != nil {
- infra.GlobalLogger.Errorf("遍历用户等待队列 %v 失败,错误信息为: %v", global.KeyTaskQueueWaitingUser, err)
- continue
- }
- for _, taskCacheJson := range taskCacheJsons {
- taskCache, err := task_cache.JsonToTaskCache(taskCacheJson)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- userId := taskCache.UserId
- userParallelism := taskCache.UserParallelism
- algorithmObjectKey := taskCache.AlgorithmObjectKey
- equipmentType := taskCache.EquipmentType
- task := taskCache.Task
- // 1 判断用户并行度是否有剩余,有剩余则加入集群等待队列,并从用户等待队列中拿出,没有剩余则不需要改动
- if redis.CanRunUser(userId, userParallelism) { // 可以运行
- err = redis.AddWaitingCluster(&project.Project{
- UserId: userId,
- Parallelism: userParallelism,
- AlgorithmObjectKey: algorithmObjectKey,
- EquipmentType: equipmentType,
- }, task)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- err = redis.DeleteWaitingUser(task.Info.TaskId)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- }
- }
- global.RunTaskMutex.Unlock()
- }
- }
- // 集群等待队列中的任务判断是否可以加入集群运行队列
- func RunWaitingCluster() {
- infra.GlobalLogger.Infof("启动【集群等待队列】监控进程。")
- for {
- time.Sleep(2 * time.Second)
- global.GpuNodeListMutex.Lock()
- // 1 判断用户并行度是否有剩余,有剩余则从集群等待队列取出第一个加入集群运行队列,并运行pod,没有剩余则不需要改动
- can, gpuNode, err := redis.CanRunCluster()
- if err != nil {
- infra.GlobalLogger.Error(err)
- global.GpuNodeListMutex.Unlock()
- continue
- }
- firstTaskCache := task_cache.TaskCache{}
- algorithmTarName := ""
- algorithmTarPath := ""
- algorithmImageName := ""
- algorithmImageNameWithVersion := ""
- algorithmExist := false
- // 确定用哪个oss
- var tempOss *oss.Bucket
- if can {
- // 判断是否有待运行的任务
- waitingClusterNumber, _ := infra.GlobalRedisClient.LLen(global.KeyTaskQueueWaitingCluster).Result()
- if waitingClusterNumber == 0 {
- global.GpuNodeListMutex.Unlock()
- continue
- } else {
- infra.GlobalLogger.Infof("集群存在 %v 个等待运行的任务。", waitingClusterNumber)
- }
- // 取出并移出,20241017 防止报错后陷入死循环
- {
- firstTaskCacheJson, err := infra.GlobalRedisClient.LIndex(global.KeyTaskQueueWaitingCluster, 0).Result()
- if err != nil {
- infra.GlobalLogger.Error("取出集群等待队列中的头元素报错,错误信息为:", err)
- global.GpuNodeListMutex.Unlock()
- continue
- }
- firstTaskCache, err = task_cache.JsonToTaskCache(firstTaskCacheJson)
- if err != nil {
- infra.GlobalLogger.Error(err)
- global.GpuNodeListMutex.Unlock()
- continue
- }
- // --------------- 从等待队列中移除
- if _, err = infra.GlobalRedisClient.LPop(global.KeyTaskQueueWaitingCluster).Result(); err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- }
- if firstTaskCache.Env == global.EnvPji {
- tempOss = infra.GlobalOssBucketPji
- } else {
- tempOss = infra.GlobalOssBucketCicv
- }
- // --------------- 下载算法 ---------------
- {
- infra.GlobalLogger.Infof("开始下载算法 %v。", firstTaskCache.AlgorithmObjectKey)
- algorithmTarName = filepath.Base(firstTaskCache.AlgorithmObjectKey)
- algorithmTarPath = infra.ApplicationYaml.K8s.AlgorithmTarTempDir + util.NewShortUUID() + "/" + algorithmTarName
- _ = util.CreateParentDir(algorithmTarPath)
- algorithmImageName = infra.ApplicationYaml.K8s.RegistryUri + "/cicvdcl_" + util.MD5HashShort(algorithmTarName)
- algorithmImageNameWithVersion = algorithmImageName + ":latest"
- algorithmExist = util.ImageExists(infra.GlobalDockerClient, algorithmImageName)
- if !algorithmExist {
- err = tempOss.GetObjectToFile(firstTaskCache.AlgorithmObjectKey, algorithmTarPath)
- if err != nil {
- infra.GlobalLogger.Error("下载oss上的算法镜像 "+firstTaskCache.AlgorithmObjectKey+" 失败,错误信息为:", err)
- time.Sleep(time.Duration(2) * time.Second)
- global.GpuNodeListMutex.Unlock()
- continue
- }
- infra.GlobalLogger.Infof("下载算法 %v 成功。", firstTaskCache.AlgorithmObjectKey)
- } else {
- infra.GlobalLogger.Infof("算法 %v 已存在。", algorithmImageName)
- }
- }
- } else {
- infra.GlobalLogger.Infof("集群没有剩余并行度。")
- global.GpuNodeListMutex.Unlock()
- continue
- }
- global.GpuNodeListMutex.Unlock()
- // 获取项目ID
- projectId := firstTaskCache.Task.Info.ProjectId
- offsetKey := "offset:" + projectId
- offset := 0
- // 根据项目ID获取偏移量
- val, err := infra.GlobalRedisClient.Get(offsetKey).Result()
- if err != nil {
- //infra.GlobalLogger.Infof("偏移量键 %v 不存在,初始化设置为 0。", offsetKey)
- err = infra.GlobalRedisClient.Set(offsetKey, 0, 0).Err()
- if err != nil {
- infra.GlobalLogger.Infof("偏移量键值对 %v 初始化失败,错误信息为: %v", offsetKey, err)
- continue
- }
- } else {
- offset, err = strconv.Atoi(val)
- if err != nil {
- infra.GlobalLogger.Infof("字符串 %v 转整数失败,错误信息为: %v", val, err)
- continue
- }
- infra.GlobalLogger.Infof("当前任务使用偏移量【%v】", offset)
- }
- // 取出偏移量后将缓存中的加一,给下个任务使用。
- _, err = infra.GlobalRedisClient.Incr(offsetKey).Result()
- if err != nil {
- infra.GlobalLogger.Infof("偏移量 %v 加一失败,错误信息为: %v", offsetKey, err)
- continue
- }
- //infra.GlobalLogger.Infof("偏移量【%v】加一给下个任务使用。", offsetKey)
- // ------- 解析 xosc ,从xosc中解析起终点位置&修改 xodr 和 osgb ---------------
- xoscOssPath := firstTaskCache.Task.Scenario.ScenarioOsc
- tempDir := infra.ApplicationYaml.TempDir
- util.CreateDir(tempDir)
- xoscLocalPath := tempDir + util.NewShortUUID() + ".xosc"
- err = tempOss.GetObjectToFile(firstTaskCache.Task.Scenario.ScenarioOsc, xoscLocalPath)
- if err != nil {
- infra.GlobalLogger.Errorf("下载xosc文件【%v】失败,错误信息为:%v", xoscOssPath, err)
- continue
- }
- startPositionX, startPositionY, startPositionZ, startPositionH, _, _, _, _, xodrPath, osgbPath := util.ParseXosc(xoscLocalPath)
- firstTaskCache.Task.Scenario.ScenarioOdr = xodrPath
- firstTaskCache.Task.Scenario.ScenarioOsgb = osgbPath
- destinationCsvOssKey := getCsvByXosc(xoscOssPath)
- var endPoints []util.DataPoint
- if exist, _ := tempOss.IsObjectExist(destinationCsvOssKey); exist {
- destinationCsvLocalPath := tempDir + util.NewShortUUID() + ".xosc"
- err = tempOss.GetObjectToFile(destinationCsvOssKey, destinationCsvLocalPath)
- endPoints, _ = util.ParseCsv(destinationCsvLocalPath)
- infra.GlobalLogger.Infof("终点文件【%v】的解析结果为:【%v】", destinationCsvLocalPath, endPoints)
- } else {
- endPositionX, endPositionY, endPositionZ, endPositionH := global.DefaultEndPositionX, global.DefaultEndPositionY, global.DefaultEndPositionZ, global.DefaultEndPositionH
- tempEndPoint := util.DataPoint{X: endPositionX, Y: endPositionY, Z: endPositionZ, H: endPositionH}
- endPoints = append(endPoints, tempEndPoint)
- infra.GlobalLogger.Infof("终点文件【%v】不存在,使用默认终点。", destinationCsvOssKey)
- }
- var endPosition string // export END_POSITION="x1,y1,z1,h1;x2,y2,z2,h2;x3,y3,z3,h3;x4,y4,z4,h4"
- // 遍历生成字符串
- var endPositionBuilder strings.Builder
- for _, endPoint := range endPoints {
- if endPositionBuilder.Len() > 0 {
- endPositionBuilder.WriteByte(';')
- }
- endPositionBuilder.WriteString(fmt.Sprintf("%s,%s,%s,%s", endPoint.X, endPoint.Y, endPoint.Z, endPoint.H))
- }
- endPosition = endPositionBuilder.String()
- // ------- 修改OGT传感器显示目标物框 -------
- for i := range firstTaskCache.Task.Vehicle.Sensors.OGT {
- firstTaskCache.Task.Vehicle.Sensors.OGT[i].SensorDisplay = global.DefaultOgtSensorDisplay
- }
- // --------------- 发送 kafka 消息(获取偏移量和分区) ---------------
- // 获取任务消息转json
- firstTaskCache.Task.Vehicle.Sensors.Camera = global.DefaultCameras
- taskJson, err := project.TaskToJson(firstTaskCache.Task)
- if err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- // ------- 发送 -------
- topic := projectId
- // 创建一个Message,并指定分区为0
- msg := &kafka.Message{
- TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: infra.ApplicationYaml.Kafka.Partition, Offset: kafka.Offset(offset)},
- Value: []byte(taskJson),
- }
- // 发送消息,并处理结果
- err = infra.GlobalKafkaProducer.Produce(msg, nil)
- if err != nil {
- infra.GlobalLogger.Infof("发送任务消息 %v 失败,错误信息为: %v", msg, err)
- continue
- }
- infra.GlobalLogger.Infof("发送任务消息成功,话题为【%v】,偏移量为【%v】,消息为【%v】", topic, offset, taskJson)
- // 如果新算法需要导入
- if !algorithmExist {
- // 导入算法
- infra.GlobalLogger.Infof("导入算法文件【%v】到docker镜像【%v】。", algorithmTarPath, algorithmImageNameWithVersion)
- _, s, err := util.Execute("docker", "import", algorithmTarPath, algorithmImageNameWithVersion)
- infra.GlobalLogger.Infof("推送算法镜像【%v】。", algorithmImageNameWithVersion)
- _, s, err = util.Execute("docker", "push", algorithmImageNameWithVersion)
- if err != nil {
- infra.GlobalLogger.Errorf("导入算法镜像 %v 为 %v 失败,执行结果为:%v,错误信息为:%v", algorithmTarPath, algorithmImageNameWithVersion, s, err)
- time.Sleep(time.Duration(2) * time.Second)
- continue
- }
- infra.GlobalLogger.Infof("导入算法镜像 %v 为 %v 成功,执行结果为:%v", algorithmTarPath, algorithmImageNameWithVersion, s)
- err = util.RemoveFile(algorithmTarPath)
- if err != nil {
- infra.GlobalLogger.Errorf("删除算法镜像文件 %v 失败,错误信息为:%v", algorithmTarPath, err)
- }
- }
- // --------------- 启动 k8s pod ---------------
- podName := "project-" + projectId + "-" + util.NewShortUUID()
- namespaceName := infra.ApplicationYaml.K8s.NamespaceName
- nodeName := gpuNode.Hostname
- restParallelism := gpuNode.Parallelism
- vtdContainer := "vtd-" + projectId
- algorithmContainer := "algorithm-" + projectId
- vtdImage := infra.ApplicationYaml.K8s.VtdImage
- // 2 生成模板文件名称
- podYaml := nodeName + "#" + podName + ".yaml"
- // 3 模板yaml存储路径
- yamlPath := infra.ApplicationYaml.K8s.PodYamlDir + podYaml
- // 4 模板yaml备份路径
- yamlPathBak := infra.ApplicationYaml.K8s.PodYamlDir + "bak/" + podYaml
- fmt.Println(yamlPath, yamlPathBak)
- // 5 不同设备类型用不同的模板文件
- podString := ""
- if firstTaskCache.EquipmentType == global.EquipmentTypeKinglong || firstTaskCache.EquipmentType == global.EquipmentTypePjisuv { // 多功能车仿真
- if podString, err = util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYamlPjisuv); err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommandPjisuv, -1)
- } else {
- if podString, err = util.ReadFile(infra.ApplicationYaml.K8s.VtdPodTemplateYamlPjibot); err != nil {
- infra.GlobalLogger.Error(err)
- continue
- }
- podString = strings.Replace(podString, "vtd-command", infra.ApplicationYaml.K8s.VtdCommandPjibot, -1)
- }
- if firstTaskCache.Env == global.EnvPji {
- podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.OssPji.Type, -1)
- podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.OssPji.Endpoint, -1) // 不带http://前缀
- podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.OssPji.AccessKeyId, -1)
- podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.OssPji.AccessKeySecret, -1)
- podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.OssPji.BucketName, -1)
- } else {
- podString = strings.Replace(podString, "oss-type", infra.ApplicationYaml.OssCicv.Type, -1)
- podString = strings.Replace(podString, "oss-ip", infra.ApplicationYaml.OssCicv.Endpoint, -1) // 不带http://前缀
- podString = strings.Replace(podString, "oss-access-key", infra.ApplicationYaml.OssCicv.AccessKeyId, -1)
- podString = strings.Replace(podString, "oss-secret-key", infra.ApplicationYaml.OssCicv.AccessKeySecret, -1)
- podString = strings.Replace(podString, "oss-bucket", infra.ApplicationYaml.OssCicv.BucketName, -1)
- }
- podString = strings.Replace(podString, "pod-name", podName, -1)
- podString = strings.Replace(podString, "namespace-name", namespaceName, -1)
- podString = strings.Replace(podString, "node-name", nodeName, -1)
- podString = strings.Replace(podString, "algorithm-image", algorithmImageNameWithVersion, -1)
- podString = strings.Replace(podString, "vtd-container", vtdContainer, -1)
- podString = strings.Replace(podString, "vtd-image", vtdImage, -1)
- podString = strings.Replace(podString, "platform-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1)
- podString = strings.Replace(podString, "simulation-cloud-ip", infra.ApplicationYaml.Web.IpPrivate+":"+infra.ApplicationYaml.Web.Port, -1)
- podString = strings.Replace(podString, "platform-type", "\""+infra.ApplicationYaml.K8s.PlatformType+"\"", -1)
- podString = strings.Replace(podString, "kafka-ip", infra.ApplicationYaml.Kafka.Broker, -1)
- podString = strings.Replace(podString, "kafka-topic", projectId, -1)
- podString = strings.Replace(podString, "kafka-partition", "\""+util.ToString(infra.ApplicationYaml.Kafka.Partition)+"\"", -1)
- podString = strings.Replace(podString, "kafka-offset", "\""+util.ToString(offset)+"\"", -1)
- podString = strings.Replace(podString, "cpu-order", "\""+util.ToString(restParallelism-1)+"\"", -1) // cpu编号是剩余并行度-1
- podString = strings.Replace(podString, "algorithm-container", algorithmContainer, -1)
- podString = strings.Replace(podString, "start-position-x", "\""+startPositionX+"\"", -1)
- podString = strings.Replace(podString, "start-position-y", "\""+startPositionY+"\"", -1)
- podString = strings.Replace(podString, "start-position-z", "\""+startPositionZ+"\"", -1)
- podString = strings.Replace(podString, "start-position-h", "\""+startPositionH+"\"", -1)
- podString = strings.Replace(podString, "end-position", "\""+endPosition+"\"", -1)
- // --------------- 保存成文件 ----------------
- err = util.WriteFile(podString, yamlPath)
- err = util.WriteFile(podString, yamlPathBak)
- if err != nil {
- infra.GlobalLogger.Error("保存yaml字符串失败,错误信息为", err)
- continue
- }
- infra.GlobalLogger.Infof("保存yaml文件到执行路径【%v】和备份路径【%v】", yamlPath, yamlPathBak)
- // --------------- 启动 pod
- _, sr, err := util.Execute("kubectl", "apply", "-f", yamlPath)
- if err != nil {
- infra.GlobalLogger.Errorf("启动pod失败,执行结果为 %v,错误信息为 %v", sr, err)
- continue
- }
- infra.GlobalLogger.Errorf("启动pod成功,执行结果为 %v。", sr)
- // 收尾
- {
- // --------------- 添加到运行队列
- err = redis.AddRunningCluster(firstTaskCache, gpuNode.Hostname)
- if err != nil {
- infra.GlobalLogger.Error(err)
- global.GpuNodeListMutex.Unlock()
- continue
- }
- // --------------- 删除镜像文件
- _ = util.RemoveFile(algorithmTarPath)
- }
- }
- }
- func getCsvByXosc(xoscOssKey string) string {
- lastIndex := strings.LastIndex(xoscOssKey, "/")
- dirPath := xoscOssKey[:lastIndex+1] // 包括最后一个 /
- destinationCsvOssKey := dirPath + global.DestinationCsvName
- return destinationCsvOssKey
- }
|