|
@@ -3,15 +3,18 @@ package com.css.simulation.resource.scheduler.consumer;
|
|
|
|
|
|
import api.common.pojo.constants.DictConstants;
|
|
import api.common.pojo.constants.DictConstants;
|
|
import api.common.pojo.dto.ProjectMessageDTO;
|
|
import api.common.pojo.dto.ProjectMessageDTO;
|
|
-import api.common.util.JsonUtil;
|
|
|
|
-import api.common.util.StringUtil;
|
|
|
|
|
|
+import api.common.util.*;
|
|
import com.css.simulation.resource.scheduler.mapper.*;
|
|
import com.css.simulation.resource.scheduler.mapper.*;
|
|
import com.css.simulation.resource.scheduler.pojo.po.*;
|
|
import com.css.simulation.resource.scheduler.pojo.po.*;
|
|
-import com.css.simulation.resource.scheduler.pojo.to.PrefixTO;
|
|
|
|
|
|
+import com.css.simulation.resource.scheduler.pojo.to.*;
|
|
import com.css.simulation.resource.scheduler.service.ProjectService;
|
|
import com.css.simulation.resource.scheduler.service.ProjectService;
|
|
|
|
+import com.css.simulation.resource.scheduler.util.MinioUtil;
|
|
import com.css.simulation.resource.scheduler.util.ProjectUtil;
|
|
import com.css.simulation.resource.scheduler.util.ProjectUtil;
|
|
|
|
+import com.fasterxml.jackson.core.JsonProcessingException;
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
+import io.minio.MinioClient;
|
|
|
|
+import io.minio.errors.*;
|
|
import lombok.SneakyThrows;
|
|
import lombok.SneakyThrows;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
@@ -21,6 +24,9 @@ import org.springframework.kafka.annotation.KafkaListener;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
import javax.annotation.Resource;
|
|
|
|
+import java.io.IOException;
|
|
|
|
+import java.security.InvalidKeyException;
|
|
|
|
+import java.security.NoSuchAlgorithmException;
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
|
|
|
@Component
|
|
@Component
|
|
@@ -34,9 +40,16 @@ public class ProjectConsumer {
|
|
String username;
|
|
String username;
|
|
@Value("${scheduler.host.password}")
|
|
@Value("${scheduler.host.password}")
|
|
String password;
|
|
String password;
|
|
|
|
+ @Value("${scheduler.linux-path.temp}")
|
|
|
|
+ String linuxTempPath;
|
|
|
|
+ @Value("${scheduler.minio-path.project-result}")
|
|
|
|
+ String projectResultPathOfMinio;
|
|
|
|
+ @Value("${minio.bucket-name}")
|
|
|
|
+ String bucketName;
|
|
|
|
|
|
// -------------------------------- Comment --------------------------------
|
|
// -------------------------------- Comment --------------------------------
|
|
-
|
|
|
|
|
|
+ @Resource
|
|
|
|
+ MinioClient minioClient;
|
|
@Resource
|
|
@Resource
|
|
StringRedisTemplate stringRedisTemplate;
|
|
StringRedisTemplate stringRedisTemplate;
|
|
@Resource
|
|
@Resource
|
|
@@ -57,6 +70,165 @@ public class ProjectConsumer {
|
|
ProjectService projectService;
|
|
ProjectService projectService;
|
|
@Resource
|
|
@Resource
|
|
ProjectUtil projectUtil;
|
|
ProjectUtil projectUtil;
|
|
|
|
+ @Resource
|
|
|
|
+ IndexMapper indexMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ TaskMapper taskMapper;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 接收到运行信息立即复制一份数据作为运行数据
|
|
|
|
+ *
|
|
|
|
+ * @param projectRecord 项目启动消息
|
|
|
|
+ */
|
|
|
|
+ @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.start-topic}")
|
|
|
|
+ @SneakyThrows
|
|
|
|
+ public void createTaskAndFixData(ConsumerRecord<String, String> projectRecord) {
|
|
|
|
+ //* -------------------------------- 0 读取消息,创建临时目录 --------------------------------
|
|
|
|
+ String initialProjectJson = projectRecord.value();
|
|
|
|
+ log.info("ProjectConsumer.fixedData() 接收到项目开始消息为:" + initialProjectJson);
|
|
|
|
+ ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(initialProjectJson, ProjectMessageDTO.class);
|
|
|
|
+ String projectId = projectMessageDTO.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
|
+ String packageId = projectMessageDTO.getScenePackageId(); // 场景测试包 id
|
|
|
|
+ String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
|
|
|
|
+ long videoTime = projectMessageDTO.getMaxSimulationTime(); // 结果视频的时长
|
|
|
|
+ long parallelism = projectMessageDTO.getParallelism(); // 项目并行度
|
|
|
|
+ String projectType = projectMessageDTO.getType(); // 项目类型
|
|
|
|
+ String userId = ""; // 用户 id
|
|
|
|
+ if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
|
|
+ userId = manualProjectMapper.selectCreateUserById(projectId);
|
|
|
|
+ } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
|
|
+ userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
|
|
+ }
|
|
|
|
+ String projectPath = linuxTempPath + projectId + "/";
|
|
|
|
+ FileUtil.mkdir(projectPath);
|
|
|
|
+ // -------------------------------- 1 查询场景 --------------------------------
|
|
|
|
+ //根据场景测试包 packageId,拿到场景集合(不包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
|
|
|
|
+ List<ScenePO> scenePOList = projectService.getSceneList(projectId, packageId, projectPath);
|
|
|
|
+ int taskTotal = scenePOList.size();
|
|
|
|
+ projectMessageDTO.setTaskTotal(taskTotal);
|
|
|
|
+ projectMessageDTO.setTaskCompleted(0);
|
|
|
|
+ //去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
|
|
|
|
+ Set<ScenePO> scenePOSet = new HashSet<>(scenePOList);
|
|
|
|
+ // -------------------------------- 2 查询模型 --------------------------------
|
|
|
|
+ //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
|
|
+ VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
|
+ List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
|
+ List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
|
+ // -------------------------------- 3 保存任务消息 --------------------------------
|
|
|
|
+ log.info("ProjectService--sendTaskMessage 项目 " + projectId + " 获得的包括的场景信息为:" + scenePOSet);
|
|
|
|
+ for (ScenePO scenePO : scenePOSet) {
|
|
|
|
+ String sceneId = scenePO.getId();
|
|
|
|
+ //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
|
|
|
|
+ List<String> lastTargetIdList = null;
|
|
|
|
+ if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
|
|
+ lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
|
|
|
|
+ } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
|
|
+ lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
|
|
|
|
+ }
|
|
|
|
+ if (CollectionUtil.isEmpty(lastTargetIdList)) {
|
|
|
|
+ throw new RuntimeException("ProjectConsumer.createTaskAndFixData() -- 项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
|
|
|
|
+ }
|
|
|
|
+ for (String lastTargetId : lastTargetIdList) {
|
|
|
|
+ String taskId = StringUtil.getRandomUUID();
|
|
|
|
+ // 保存任务信息
|
|
|
|
+ TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
|
|
|
|
+ .id(taskId)
|
|
|
|
+ .pId(projectId)
|
|
|
|
+ .sceneId(sceneId)
|
|
|
|
+ .lastTargetId(lastTargetId)
|
|
|
|
+ .sceneName(scenePO.getName())
|
|
|
|
+ .sceneType(scenePO.getType())
|
|
|
|
+ .runState(DictConstants.TASK_PENDING)
|
|
|
|
+ .runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId)
|
|
|
|
+ .build();
|
|
|
|
+ taskPO.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
|
+ taskPO.setCreateUserId(userId);
|
|
|
|
+ taskPO.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
|
+ taskPO.setModifyUserId(userId);
|
|
|
|
+ taskPO.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
|
+ taskPO.setIsDeleted("0");
|
|
|
|
+ taskMapper.insert(taskPO);
|
|
|
|
+ // 下载 xodr 和 osgb 供仿真后生成 xosc
|
|
|
|
+ String scenarioOdr = scenePO.getScenarioOdr();
|
|
|
|
+ String scenarioOsgb = scenePO.getScenarioOsgb();
|
|
|
|
+ String[] splitXodr = scenarioOdr.split("/");
|
|
|
|
+ String xodrName = splitXodr[splitXodr.length - 1];
|
|
|
|
+ String[] splitOsgb = scenarioOsgb.split("/");
|
|
|
|
+ String osgbName = splitOsgb[splitOsgb.length - 1];
|
|
|
|
+ try {
|
|
|
|
+ String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
|
|
|
|
+ String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + ".xodr";
|
|
|
|
+ String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
|
|
|
|
+ String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + ".osgb";
|
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
|
|
|
|
+ MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
|
|
|
|
+ log.info("ProjectService--sendTaskMessage 已经将 xodr 上传到 minio 的结果文件目录:" + xodrPathOfMinio);
|
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
|
|
|
|
+ MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
|
|
|
|
+ log.info("ProjectService--sendTaskMessage 已经将 osgb 上传到 minio 的结果文件目录:" + osgbPathOfMinio);
|
|
|
|
+ } catch (IOException | ServerException | InsufficientDataException | ErrorResponseException |
|
|
|
|
+ NoSuchAlgorithmException | InvalidKeyException | InvalidResponseException |
|
|
|
|
+ XmlParserException | InternalException e) {
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 组装 task 消息
|
|
|
|
+ TaskTO taskTO = TaskTO.builder()
|
|
|
|
+ .info(InfoTO.builder()
|
|
|
|
+ .project_id(taskPO.getPId())
|
|
|
|
+ .task_id(taskPO.getId())
|
|
|
|
+ .task_path(taskPO.getRunResultFilePath())
|
|
|
|
+ .default_time(videoTime)
|
|
|
|
+ .build())
|
|
|
|
+ .scenario(ScenarioTO.builder()
|
|
|
|
+ .scenario_osc(scenePO.getScenarioOsc())
|
|
|
|
+ .scenario_odr(scenarioOdr)
|
|
|
|
+ .scenario_osgb(scenarioOsgb)
|
|
|
|
+ .build())
|
|
|
|
+ .vehicle(VehicleTO.builder()
|
|
|
|
+ .model(ModelTO.builder()
|
|
|
|
+ .model_label(vehiclePO.getModelLabel())
|
|
|
|
+ .build())
|
|
|
|
+ .dynamics(DynamicsTO.builder()
|
|
|
|
+ .dynamics_maxspeed(vehiclePO.getMaxSpeed())
|
|
|
|
+ .dynamics_enginepower(vehiclePO.getEnginePower())
|
|
|
|
+ .dynamics_maxdecel(vehiclePO.getMaxDeceleration())
|
|
|
|
+ .dynamics_maxsteering(vehiclePO.getMaxSteeringAngle())
|
|
|
|
+ .dynamics_mass(vehiclePO.getMass())
|
|
|
|
+ .dynamics_frontsurfaceeffective(vehiclePO.getFrontSurfaceEffective())
|
|
|
|
+ .dynamics_airdragcoefficient(vehiclePO.getAirDragCoefficient())
|
|
|
|
+ .dynamics_rollingresistance(vehiclePO.getRollingResistanceCoefficient())
|
|
|
|
+ .dynamics_wheeldiameter(vehiclePO.getWheelDiameter())
|
|
|
|
+ .dynamics_wheeldrive(vehiclePO.getWheelDrive())
|
|
|
|
+ .dynamics_overallefficiency(vehiclePO.getOverallEfficiency())
|
|
|
|
+ .dynamics_distfront(vehiclePO.getFrontDistance())
|
|
|
|
+ .dynamics_distrear(vehiclePO.getRearDistance())
|
|
|
|
+ .dynamics_distleft(vehiclePO.getLeftDistance())
|
|
|
|
+ .dynamics_distright(vehiclePO.getRightDistance())
|
|
|
|
+ .dynamics_distheight(vehiclePO.getHeightDistance())
|
|
|
|
+ .dynamics_wheelbase(vehiclePO.getWheelbase())
|
|
|
|
+ .build())
|
|
|
|
+ .sensors(SensorsTO.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
|
+ .camera(cameraPOList)
|
|
|
|
+ .OGT(ogtPOList)
|
|
|
|
+ .build())
|
|
|
|
+ .build())
|
|
|
|
+ .build();
|
|
|
|
+
|
|
|
|
+ //4-4 将对象转成 json
|
|
|
|
+ String taskJson = "";
|
|
|
|
+ try {
|
|
|
|
+ taskJson = JsonUtil.beanToJson(taskTO);
|
|
|
|
+ } catch (JsonProcessingException e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
|
|
|
|
+ String finalTaskJson = taskJson;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
|
|
|
|
|
|
/**
|
|
/**
|