|
@@ -5,13 +5,13 @@ import api.common.pojo.constants.DictConstants;
|
|
import api.common.pojo.dto.ProjectMessageDTO;
|
|
import api.common.pojo.dto.ProjectMessageDTO;
|
|
import api.common.util.*;
|
|
import api.common.util.*;
|
|
import com.css.simulation.resource.scheduler.entity.*;
|
|
import com.css.simulation.resource.scheduler.entity.*;
|
|
-import com.css.simulation.resource.scheduler.service.ProjectManager;
|
|
|
|
-import com.css.simulation.resource.scheduler.service.TaskManager;
|
|
|
|
import com.css.simulation.resource.scheduler.mapper.*;
|
|
import com.css.simulation.resource.scheduler.mapper.*;
|
|
|
|
+import com.css.simulation.resource.scheduler.service.ProjectManager;
|
|
import com.css.simulation.resource.scheduler.service.ProjectService;
|
|
import com.css.simulation.resource.scheduler.service.ProjectService;
|
|
import com.css.simulation.resource.scheduler.util.ApacheKafkaUtil;
|
|
import com.css.simulation.resource.scheduler.util.ApacheKafkaUtil;
|
|
import com.css.simulation.resource.scheduler.util.MinioUtil;
|
|
import com.css.simulation.resource.scheduler.util.MinioUtil;
|
|
import com.css.simulation.resource.scheduler.util.ProjectUtil;
|
|
import com.css.simulation.resource.scheduler.util.ProjectUtil;
|
|
|
|
+import com.css.simulation.resource.scheduler.util.TaskUtil;
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import io.minio.MinioClient;
|
|
import io.minio.MinioClient;
|
|
@@ -35,490 +35,486 @@ import java.util.concurrent.TimeUnit;
|
|
@Component
|
|
@Component
|
|
@Slf4j
|
|
@Slf4j
|
|
public class ProjectConsumer {
|
|
public class ProjectConsumer {
|
|
- @Value("${scheduler.linux-path.temp}")
|
|
|
|
- private String linuxTempPath;
|
|
|
|
- @Value("${scheduler.minio-path.project-result}")
|
|
|
|
- private String projectResultPathOfMinio;
|
|
|
|
- @Value("${minio.bucket-name}")
|
|
|
|
- private String bucketName;
|
|
|
|
-
|
|
|
|
- // -------------------------------- Comment --------------------------------
|
|
|
|
- @Resource
|
|
|
|
- private MinioClient minioClient;
|
|
|
|
- @Resource
|
|
|
|
- private StringRedisTemplate stringRedisTemplate;
|
|
|
|
- @Resource
|
|
|
|
- private ManualProjectMapper manualProjectMapper;
|
|
|
|
- @Resource
|
|
|
|
- private AutoSubProjectMapper autoSubProjectMapper;
|
|
|
|
- @Resource
|
|
|
|
- private VehicleMapper vehicleMapper;
|
|
|
|
- @Resource
|
|
|
|
- private SensorCameraMapper sensorCameraMapper;
|
|
|
|
- @Resource
|
|
|
|
- private SensorOgtMapper sensorOgtMapper;
|
|
|
|
- @Resource
|
|
|
|
- private AlgorithmMapper algorithmMapper;
|
|
|
|
- @Resource
|
|
|
|
- private UserMapper userMapper;
|
|
|
|
- @Resource
|
|
|
|
- private ClusterMapper clusterMapper;
|
|
|
|
- @Resource
|
|
|
|
- private ProjectService projectService;
|
|
|
|
- @Resource
|
|
|
|
- private ProjectUtil projectUtil;
|
|
|
|
- @Resource
|
|
|
|
- private IndexMapper indexMapper;
|
|
|
|
- @Resource
|
|
|
|
- private TaskMapper taskMapper;
|
|
|
|
- @Resource
|
|
|
|
- private TaskManager taskManager;
|
|
|
|
- @Resource
|
|
|
|
- private ProjectManager projectManager;
|
|
|
|
- @Resource
|
|
|
|
- private KafkaTemplate<String, String> kafkaTemplate;
|
|
|
|
- @Resource(name = "myKafkaAdmin")
|
|
|
|
- private Admin kafkaAdminClient;
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 接收到运行信息立即复制一份数据作为运行数据
|
|
|
|
- *
|
|
|
|
- * @param projectRecord 项目启动消息
|
|
|
|
- */
|
|
|
|
- @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.start-topic}")
|
|
|
|
- @SneakyThrows
|
|
|
|
- public void acceptMessage(ConsumerRecord<String, String> projectRecord) {
|
|
|
|
- final ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectRecord.value(), ProjectMessageDTO.class);
|
|
|
|
- log.info("接收到项目开始消息为:" + projectMessageDTO);
|
|
|
|
- new Thread(() -> createTaskAndFixData(projectMessageDTO), "fix-" + projectMessageDTO.getProjectId()).start();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- @SneakyThrows
|
|
|
|
- public void createTaskAndFixData(ProjectMessageDTO projectMessageDTO) {
|
|
|
|
- //* -------------------------------- 0 读取消息,创建临时目录 --------------------------------
|
|
|
|
- String projectId = projectMessageDTO.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
|
- String modelType = projectMessageDTO.getModelType(); // 模型类型,1 动力学模型 2 carsim模型
|
|
|
|
- String packageId = projectMessageDTO.getScenePackageId(); // 场景测试包 id
|
|
|
|
- String vehicleConfigId = projectMessageDTO.getVehicleConfigId(); // 模型配置 id
|
|
|
|
- String algorithmId = projectMessageDTO.getAlgorithmId(); // 模型配置 id
|
|
|
|
- long videoTime = projectMessageDTO.getMaxSimulationTime(); // 结果视频的时长
|
|
|
|
- String projectType = projectMessageDTO.getType(); // 项目类型
|
|
|
|
- String userId = ""; // 用户 id
|
|
|
|
- if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
|
|
- userId = manualProjectMapper.selectCreateUserById(projectId);
|
|
|
|
- } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
|
|
- userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
|
|
|
|
+ @Value("${scheduler.linux-path.temp}")
|
|
|
|
+ private String linuxTempPath;
|
|
|
|
+ @Value("${scheduler.minio-path.project-result}")
|
|
|
|
+ private String projectResultPathOfMinio;
|
|
|
|
+ @Value("${minio.bucket-name}")
|
|
|
|
+ private String bucketName;
|
|
|
|
+
|
|
|
|
+ // -------------------------------- Comment --------------------------------
|
|
|
|
+ @Resource
|
|
|
|
+ private MinioClient minioClient;
|
|
|
|
+ @Resource
|
|
|
|
+ private StringRedisTemplate stringRedisTemplate;
|
|
|
|
+ @Resource
|
|
|
|
+ private ManualProjectMapper manualProjectMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ private AutoSubProjectMapper autoSubProjectMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ private VehicleMapper vehicleMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ private SensorCameraMapper sensorCameraMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ private SensorOgtMapper sensorOgtMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ private AlgorithmMapper algorithmMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ private UserMapper userMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ private ClusterMapper clusterMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ private ProjectService projectService;
|
|
|
|
+ @Resource
|
|
|
|
+ private ProjectUtil projectUtil;
|
|
|
|
+ @Resource
|
|
|
|
+ private IndexMapper indexMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ private TaskMapper taskMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ private TaskUtil taskUtil;
|
|
|
|
+ @Resource
|
|
|
|
+ private ProjectManager projectManager;
|
|
|
|
+ @Resource
|
|
|
|
+ private KafkaTemplate<String, String> kafkaTemplate;
|
|
|
|
+ @Resource(name = "myKafkaAdmin")
|
|
|
|
+ private Admin kafkaAdminClient;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 接收到运行信息立即复制一份数据作为运行数据
|
|
|
|
+ *
|
|
|
|
+ * @param projectRecord 项目启动消息
|
|
|
|
+ */
|
|
|
|
+ @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.start-topic}")
|
|
|
|
+ @SneakyThrows
|
|
|
|
+ public void acceptMessage(ConsumerRecord<String, String> projectRecord) {
|
|
|
|
+ final ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectRecord.value(), ProjectMessageDTO.class);
|
|
|
|
+ log.info("接收到项目开始消息为:" + projectMessageDTO);
|
|
|
|
+ new Thread(() -> createTaskAndFixData(projectMessageDTO), "fix-" + projectMessageDTO.getProjectId()).start();
|
|
}
|
|
}
|
|
- String projectPath = linuxTempPath + "project/" + projectId + "/";
|
|
|
|
- FileUtil.mkdir(projectPath);
|
|
|
|
- //5 将该 project 下所有旧的指标得分删除。
|
|
|
|
- taskMapper.deleteByProject(projectId);
|
|
|
|
- indexMapper.deleteFirstTargetScoreByProjectId(projectId);
|
|
|
|
- indexMapper.deleteLastTargetScoreByProjectId(projectId);
|
|
|
|
- // -------------------------------- 1 查询场景 --------------------------------
|
|
|
|
- log.info("项目 " + projectId + " 开始查询场景。");
|
|
|
|
- //根据场景测试包 packageId,拿到场景集合(包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
|
|
|
|
- List<SceneEntity> sceneEntityList = projectService.getSceneList(projectId, packageId);
|
|
|
|
- int taskTotal = sceneEntityList.size();
|
|
|
|
- projectMessageDTO.setTaskTotal(taskTotal);
|
|
|
|
- projectMessageDTO.setTaskCompleted(0);
|
|
|
|
- //去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
|
|
|
|
- Set<SceneEntity> sceneEntitySet = new HashSet<>(sceneEntityList);
|
|
|
|
- log.info("项目 " + projectId + " 场景包括:" + sceneEntitySet);
|
|
|
|
- // -------------------------------- 2 算法导入 --------------------------------
|
|
|
|
- log.info("项目 " + projectId + " 开始算法导入。");
|
|
|
|
- String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
|
|
|
|
- log.info("项目 " + projectId + " 算法已导入:" + algorithmDockerImage);
|
|
|
|
- // -------------------------------- 3 查询模型 --------------------------------
|
|
|
|
- if ("1".equals(modelType)) {
|
|
|
|
- log.info("项目 " + projectId + " 开始查询模型。");
|
|
|
|
- //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
|
|
- VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
|
- List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
|
- List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
|
- // -------------------------------- 4 保存任务消息 --------------------------------
|
|
|
|
- log.info("项目 " + projectId + " 开始保存任务消息。");
|
|
|
|
- List<TaskEntity> taskList = new ArrayList<>();
|
|
|
|
- for (SceneEntity sceneEntity : sceneEntitySet) {
|
|
|
|
- String sceneId = sceneEntity.getId();
|
|
|
|
- //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
|
|
|
|
- List<String> lastTargetIdList = null;
|
|
|
|
- if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
|
|
- lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
|
|
|
|
- } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
|
|
- lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
|
|
|
|
- }
|
|
|
|
- if (CollectionUtil.isEmpty(lastTargetIdList)) {
|
|
|
|
- throw new RuntimeException("项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
|
|
|
|
- }
|
|
|
|
- for (String lastTargetId : lastTargetIdList) {
|
|
|
|
- String taskId = StringUtil.getRandomUUID();
|
|
|
|
- // 保存任务信息
|
|
|
|
- TaskEntity taskEntity = TaskEntity.builder() // run_start_time 和 run_end_time 不填
|
|
|
|
- .id(taskId).pId(projectId).sceneId(sceneId).lastTargetId(lastTargetId).sceneName(sceneEntity.getName()).sceneType(sceneEntity.getType()).runState(DictConstants.TASK_PENDING).runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId).build();
|
|
|
|
- taskEntity.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
|
- taskEntity.setCreateUserId(userId);
|
|
|
|
- taskEntity.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
|
- taskEntity.setModifyUserId(userId);
|
|
|
|
- taskEntity.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
|
- taskEntity.setIsDeleted("0");
|
|
|
|
- taskList.add(taskEntity);
|
|
|
|
- // 将 xosc、xodr、osgb 全部上传到仿真结果路径
|
|
|
|
- String scenarioOsc = sceneEntity.getScenarioOsc();
|
|
|
|
- String[] splitXosc = scenarioOsc.split("/");
|
|
|
|
- String xoscName = splitXosc[splitXosc.length - 1];
|
|
|
|
- String[] xoscNameSplit = xoscName.split("\\.");
|
|
|
|
- String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
|
|
|
|
- String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
|
|
|
|
- String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
|
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
|
|
|
|
- MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
|
|
|
|
-
|
|
|
|
- String scenarioOdr = sceneEntity.getScenarioOdr();
|
|
|
|
- String[] splitXodr = scenarioOdr.split("/");
|
|
|
|
- String xodrName = splitXodr[splitXodr.length - 1];
|
|
|
|
- String[] xodrNameSplit = xodrName.split("\\.");
|
|
|
|
- String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
|
|
|
|
- String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
|
|
|
|
- String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
|
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
|
|
|
|
- MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
|
|
|
|
-
|
|
|
|
- String scenarioOsgb = sceneEntity.getScenarioOsgb();
|
|
|
|
- String[] splitOsgb = scenarioOsgb.split("/");
|
|
|
|
- String osgbName = splitOsgb[splitOsgb.length - 1];
|
|
|
|
- String[] osgbNameSplit = osgbName.split("\\.");
|
|
|
|
- String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
|
|
|
|
- String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
|
|
|
|
- String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
|
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
|
|
|
|
- MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
|
|
|
|
- log.info("已经将 xosc、xodr、osgb 上传到 minio 的结果文件目录:" + projectResultPathOfMinio);
|
|
|
|
-
|
|
|
|
- // 组装 task 消息
|
|
|
|
- TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleTO.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(DynamicsTO.builder().dynamics_maxspeed(vehicleEntity.getMaxSpeed()).dynamics_enginepower(vehicleEntity.getEnginePower()).dynamics_maxdecel(vehicleEntity.getMaxDeceleration()).dynamics_maxsteering(vehicleEntity.getMaxSteeringAngle()).dynamics_mass(vehicleEntity.getMass()).dynamics_frontsurfaceeffective(vehicleEntity.getFrontSurfaceEffective()).dynamics_airdragcoefficient(vehicleEntity.getAirDragCoefficient()).dynamics_rollingresistance(vehicleEntity.getRollingResistanceCoefficient()).dynamics_wheeldiameter(vehicleEntity.getWheelDiameter()).dynamics_wheeldrive(vehicleEntity.getWheelDrive()).dynamics_overallefficiency(vehicleEntity.getOverallEfficiency()).dynamics_distfront(vehicleEntity.getFrontDistance()).dynamics_distrear(vehicleEntity.getRearDistance()).dynamics_distleft(vehicleEntity.getLeftDistance()).dynamics_distright(vehicleEntity.getRightDistance()).dynamics_distheight(vehicleEntity.getHeightDistance()).dynamics_wheelbase(vehicleEntity.getWheelbase()).build()).sensors(SensorsEntity.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
|
- .camera(cameraEntityList).OGT(ogtEntityList).build()).build()).build();
|
|
|
|
- FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
|
|
|
|
- log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ public void createTaskAndFixData(ProjectMessageDTO projectMessageDTO) {
|
|
|
|
+ //* -------------------------------- 0 读取消息,创建临时目录 --------------------------------
|
|
|
|
+ String projectId = projectMessageDTO.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
|
+ String modelType = projectMessageDTO.getModelType(); // 模型类型,1 动力学模型 2 carsim模型
|
|
|
|
+ String packageId = projectMessageDTO.getScenePackageId(); // 场景测试包 id
|
|
|
|
+ String vehicleConfigId = projectMessageDTO.getVehicleConfigId(); // 模型配置 id
|
|
|
|
+ String algorithmId = projectMessageDTO.getAlgorithmId(); // 模型配置 id
|
|
|
|
+ long videoTime = projectMessageDTO.getMaxSimulationTime(); // 结果视频的时长
|
|
|
|
+ String projectType = projectMessageDTO.getType(); // 项目类型
|
|
|
|
+ try {
|
|
|
|
+ String userId = ""; // 用户 id
|
|
|
|
+ if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
|
|
+ userId = manualProjectMapper.selectCreateUserById(projectId);
|
|
|
|
+ } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
|
|
+ userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
|
|
+ }
|
|
|
|
+ String projectPath = linuxTempPath + "project/" + projectId + "/";
|
|
|
|
+ FileUtil.mkdir(projectPath);
|
|
|
|
+ //5 将该 project 下所有旧的指标得分删除。
|
|
|
|
+ taskMapper.deleteByProject(projectId);
|
|
|
|
+ indexMapper.deleteFirstTargetScoreByProjectId(projectId);
|
|
|
|
+ indexMapper.deleteLastTargetScoreByProjectId(projectId);
|
|
|
|
+ // -------------------------------- 1 查询场景 --------------------------------
|
|
|
|
+ log.info("项目 " + projectId + " 开始查询场景。");
|
|
|
|
+ //根据场景测试包 packageId,拿到场景集合(包括重复场景),重复场景会在发送消息时根据叶子指标发送多次。
|
|
|
|
+ List<SceneEntity> sceneEntityList = projectService.getSceneList(projectId, packageId);
|
|
|
|
+ int taskTotal = sceneEntityList.size();
|
|
|
|
+ projectMessageDTO.setTaskTotal(taskTotal);
|
|
|
|
+ projectMessageDTO.setTaskCompleted(0);
|
|
|
|
+ //去重,之后发送消息的时候会补全指标,如果不去重的话会出现多个场景重复关联多个指标
|
|
|
|
+ Set<SceneEntity> sceneEntitySet = new HashSet<>(sceneEntityList);
|
|
|
|
+ log.info("项目 " + projectId + " 场景包括:" + sceneEntitySet);
|
|
|
|
+ // -------------------------------- 2 算法导入 --------------------------------
|
|
|
|
+ log.info("项目 " + projectId + " 开始算法导入。");
|
|
|
|
+ String algorithmDockerImage = projectService.handleAlgorithm(projectId, algorithmId);
|
|
|
|
+ log.info("项目 " + projectId + " 算法已导入:" + algorithmDockerImage);
|
|
|
|
+ // -------------------------------- 3 查询模型 --------------------------------
|
|
|
|
+ if ("1".equals(modelType)) {
|
|
|
|
+ log.info("项目 " + projectId + " 开始查询模型。");
|
|
|
|
+ //2-1 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
|
|
+ VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
|
+ List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
|
+ List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
|
+ // -------------------------------- 4 保存任务消息 --------------------------------
|
|
|
|
+ log.info("项目 " + projectId + " 开始保存任务消息。");
|
|
|
|
+ List<TaskEntity> taskList = new ArrayList<>();
|
|
|
|
+ for (SceneEntity sceneEntity : sceneEntitySet) {
|
|
|
|
+ String sceneId = sceneEntity.getId();
|
|
|
|
+ //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
|
|
|
|
+ List<String> lastTargetIdList = null;
|
|
|
|
+ if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
|
|
+ lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
|
|
|
|
+ } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
|
|
+ lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
|
|
|
|
+ }
|
|
|
|
+ if (CollectionUtil.isEmpty(lastTargetIdList)) {
|
|
|
|
+ throw new RuntimeException("项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
|
|
|
|
+ }
|
|
|
|
+ for (String lastTargetId : lastTargetIdList) {
|
|
|
|
+ String taskId = StringUtil.getRandomUUID();
|
|
|
|
+ // 保存任务信息
|
|
|
|
+ TaskEntity taskEntity = TaskEntity.builder() // run_start_time 和 run_end_time 不填
|
|
|
|
+ .id(taskId).pId(projectId).sceneId(sceneId).lastTargetId(lastTargetId).sceneName(sceneEntity.getName()).sceneType(sceneEntity.getType()).runState(DictConstants.TASK_PENDING).runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId).build();
|
|
|
|
+ taskEntity.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
|
+ taskEntity.setCreateUserId(userId);
|
|
|
|
+ taskEntity.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
|
+ taskEntity.setModifyUserId(userId);
|
|
|
|
+ taskEntity.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
|
+ taskEntity.setIsDeleted("0");
|
|
|
|
+ taskList.add(taskEntity);
|
|
|
|
+ // 将 xosc、xodr、osgb 全部上传到仿真结果路径
|
|
|
|
+ String scenarioOsc = sceneEntity.getScenarioOsc();
|
|
|
|
+ String[] splitXosc = scenarioOsc.split("/");
|
|
|
|
+ String xoscName = splitXosc[splitXosc.length - 1];
|
|
|
|
+ String[] xoscNameSplit = xoscName.split("\\.");
|
|
|
|
+ String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
|
|
|
|
+ String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
|
|
|
|
+ String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
|
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
|
|
|
|
+ MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
|
|
|
|
+
|
|
|
|
+ String scenarioOdr = sceneEntity.getScenarioOdr();
|
|
|
|
+ String[] splitXodr = scenarioOdr.split("/");
|
|
|
|
+ String xodrName = splitXodr[splitXodr.length - 1];
|
|
|
|
+ String[] xodrNameSplit = xodrName.split("\\.");
|
|
|
|
+ String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
|
|
|
|
+ String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
|
|
|
|
+ String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
|
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
|
|
|
|
+ MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
|
|
|
|
+
|
|
|
|
+ String scenarioOsgb = sceneEntity.getScenarioOsgb();
|
|
|
|
+ String[] splitOsgb = scenarioOsgb.split("/");
|
|
|
|
+ String osgbName = splitOsgb[splitOsgb.length - 1];
|
|
|
|
+ String[] osgbNameSplit = osgbName.split("\\.");
|
|
|
|
+ String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
|
|
|
|
+ String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
|
|
|
|
+ String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
|
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
|
|
|
|
+ MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
|
|
|
|
+ log.info("已经将 xosc、xodr、osgb 上传到 minio 的结果文件目录:" + projectResultPathOfMinio);
|
|
|
|
+
|
|
|
|
+ // 组装 task 消息
|
|
|
|
+ TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleTO.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(DynamicsTO.builder().dynamics_maxspeed(vehicleEntity.getMaxSpeed()).dynamics_enginepower(vehicleEntity.getEnginePower()).dynamics_maxdecel(vehicleEntity.getMaxDeceleration()).dynamics_maxsteering(vehicleEntity.getMaxSteeringAngle()).dynamics_mass(vehicleEntity.getMass()).dynamics_frontsurfaceeffective(vehicleEntity.getFrontSurfaceEffective()).dynamics_airdragcoefficient(vehicleEntity.getAirDragCoefficient()).dynamics_rollingresistance(vehicleEntity.getRollingResistanceCoefficient()).dynamics_wheeldiameter(vehicleEntity.getWheelDiameter()).dynamics_wheeldrive(vehicleEntity.getWheelDrive()).dynamics_overallefficiency(vehicleEntity.getOverallEfficiency()).dynamics_distfront(vehicleEntity.getFrontDistance()).dynamics_distrear(vehicleEntity.getRearDistance()).dynamics_distleft(vehicleEntity.getLeftDistance()).dynamics_distright(vehicleEntity.getRightDistance()).dynamics_distheight(vehicleEntity.getHeightDistance()).dynamics_wheelbase(vehicleEntity.getWheelbase()).build()).sensors(SensorsEntity.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
|
+ .camera(cameraEntityList).OGT(ogtEntityList).build()).build()).build();
|
|
|
|
+ FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
|
|
|
|
+ log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ taskUtil.batchInsertTask(taskList);
|
|
|
|
+ log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
|
|
|
|
+ } else if ("2".equals(modelType)) {
|
|
|
|
+ log.info("项目 " + projectId + " 开始查询模型。");
|
|
|
|
+
|
|
|
|
+ VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
|
+ List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
|
+ List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
|
+ // -------------------------------- 4 保存任务消息 --------------------------------
|
|
|
|
+ log.info("项目 " + projectId + " 开始保存任务消息。");
|
|
|
|
+ List<TaskEntity> taskList = new ArrayList<>();
|
|
|
|
+ for (SceneEntity sceneEntity : sceneEntitySet) {
|
|
|
|
+ String sceneId = sceneEntity.getId();
|
|
|
|
+ //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
|
|
|
|
+ List<String> lastTargetIdList = null;
|
|
|
|
+ if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
|
|
+ lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
|
|
|
|
+ } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
|
|
+ lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
|
|
|
|
+ }
|
|
|
|
+ if (CollectionUtil.isEmpty(lastTargetIdList)) {
|
|
|
|
+ throw new RuntimeException("项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
|
|
|
|
+ }
|
|
|
|
+ for (String lastTargetId : lastTargetIdList) {
|
|
|
|
+ String taskId = StringUtil.getRandomUUID();
|
|
|
|
+ // 保存任务信息
|
|
|
|
+ TaskEntity taskEntity = TaskEntity.builder() // run_start_time 和 run_end_time 不填
|
|
|
|
+ .id(taskId).pId(projectId).sceneId(sceneId).lastTargetId(lastTargetId).sceneName(sceneEntity.getName()).sceneType(sceneEntity.getType()).runState(DictConstants.TASK_PENDING).runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId).build();
|
|
|
|
+ taskEntity.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
|
+ taskEntity.setCreateUserId(userId);
|
|
|
|
+ taskEntity.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
|
+ taskEntity.setModifyUserId(userId);
|
|
|
|
+ taskEntity.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
|
+ taskEntity.setIsDeleted("0");
|
|
|
|
+ taskList.add(taskEntity);
|
|
|
|
+ // 将 xosc、xodr、osgb 全部上传到仿真结果路径
|
|
|
|
+ String scenarioOsc = sceneEntity.getScenarioOsc();
|
|
|
|
+ String[] splitXosc = scenarioOsc.split("/");
|
|
|
|
+ String xoscName = splitXosc[splitXosc.length - 1];
|
|
|
|
+ String[] xoscNameSplit = xoscName.split("\\.");
|
|
|
|
+ String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
|
|
|
|
+ String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
|
|
|
|
+ String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
|
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
|
|
|
|
+ MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
|
|
|
|
+
|
|
|
|
+ String scenarioOdr = sceneEntity.getScenarioOdr();
|
|
|
|
+ String[] splitXodr = scenarioOdr.split("/");
|
|
|
|
+ String xodrName = splitXodr[splitXodr.length - 1];
|
|
|
|
+ String[] xodrNameSplit = xodrName.split("\\.");
|
|
|
|
+ String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
|
|
|
|
+ String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
|
|
|
|
+ String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
|
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
|
|
|
|
+ MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
|
|
|
|
+
|
|
|
|
+ String scenarioOsgb = sceneEntity.getScenarioOsgb();
|
|
|
|
+ String[] splitOsgb = scenarioOsgb.split("/");
|
|
|
|
+ String osgbName = splitOsgb[splitOsgb.length - 1];
|
|
|
|
+ String[] osgbNameSplit = osgbName.split("\\.");
|
|
|
|
+ String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
|
|
|
|
+ String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
|
|
|
|
+ String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
|
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
|
|
|
|
+ MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
|
|
|
|
+ log.info("已经将 xosc、xodr、osgb 上传到 minio 的结果文件目录:" + projectResultPathOfMinio);
|
|
|
|
+
|
|
|
|
+ // 组装 task 消息
|
|
|
|
+ // carsim 不需要查询模型参数
|
|
|
|
+ TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleTO.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(null).sensors(SensorsEntity.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
|
+ .camera(cameraEntityList).OGT(ogtEntityList).build()).build()).build();
|
|
|
|
+
|
|
|
|
+ FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
|
|
|
|
+ log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ taskUtil.batchInsertTask(taskList);
|
|
|
|
+ log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //* -------------------------------- 4 开始排队 --------------------------------
|
|
|
|
+ cacheProject(projectMessageDTO);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ log.error("项目报错。", e);
|
|
|
|
+ projectService.stopProject(projectId, projectType);
|
|
|
|
+ throw new RuntimeException(e);
|
|
}
|
|
}
|
|
- }
|
|
|
|
- taskManager.batchInsertTask(taskList);
|
|
|
|
- log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
|
|
|
|
- } else if ("2".equals(modelType)) {
|
|
|
|
- log.info("项目 " + projectId + " 开始查询模型。");
|
|
|
|
-
|
|
|
|
- VehicleEntity vehicleEntity = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 车辆
|
|
|
|
- List<CameraEntity> cameraEntityList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
|
- List<OgtEntity> ogtEntityList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
|
- // -------------------------------- 4 保存任务消息 --------------------------------
|
|
|
|
- log.info("项目 " + projectId + " 开始保存任务消息。");
|
|
|
|
- List<TaskEntity> taskList = new ArrayList<>();
|
|
|
|
- for (SceneEntity sceneEntity : sceneEntitySet) {
|
|
|
|
- String sceneId = sceneEntity.getId();
|
|
|
|
- //3-1 可能会存在多个指标下有同样的场景,所以会查出多个指标,多个指标的场景需要发送多次
|
|
|
|
- List<String> lastTargetIdList = null;
|
|
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 任务运行前首先判断用户是否拥有可分配资源
|
|
|
|
+ *
|
|
|
|
+ * @param projectMessageDTO 项目启动消息
|
|
|
|
+ */
|
|
|
|
+ @SneakyThrows
|
|
|
|
+ public void cacheProject(ProjectMessageDTO projectMessageDTO) {
|
|
|
|
+ log.info("判断用户是否拥有可分配资源:" + projectMessageDTO);
|
|
|
|
+ //1 读取 kafka 的 project 信息
|
|
|
|
+ String projectId = projectMessageDTO.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
|
+ long parallelism = projectMessageDTO.getParallelism(); // 项目并行度
|
|
|
|
+ String projectType = projectMessageDTO.getType(); // 项目类型
|
|
|
|
+ //2 根据 projectId 获取创建用户 id
|
|
|
|
+ String userId;
|
|
if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
- lastTargetIdList = indexMapper.selectLeafIndexIdByManualProjectIdAndSceneId(projectId, "%" + sceneId + "%");
|
|
|
|
|
|
+ userId = manualProjectMapper.selectCreateUserById(projectId);
|
|
} else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
} else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
- lastTargetIdList = indexMapper.selectLeafIndexIdByAutoSubProjectIdAndSceneId(projectId, "%" + sceneId + "%");
|
|
|
|
|
|
+ userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
|
|
+ } else {
|
|
|
|
+ log.error("项目类型错误:" + projectMessageDTO);
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
- if (CollectionUtil.isEmpty(lastTargetIdList)) {
|
|
|
|
- throw new RuntimeException("项目 " + projectId + " 使用的场景测试包 " + sceneId + " 不存在指标。");
|
|
|
|
|
|
+ if (StringUtil.isEmpty(userId)) {
|
|
|
|
+ log.error("未查询到项目创建人:" + projectMessageDTO);
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
- for (String lastTargetId : lastTargetIdList) {
|
|
|
|
- String taskId = StringUtil.getRandomUUID();
|
|
|
|
- // 保存任务信息
|
|
|
|
- TaskEntity taskEntity = TaskEntity.builder() // run_start_time 和 run_end_time 不填
|
|
|
|
- .id(taskId).pId(projectId).sceneId(sceneId).lastTargetId(lastTargetId).sceneName(sceneEntity.getName()).sceneType(sceneEntity.getType()).runState(DictConstants.TASK_PENDING).runResultFilePath(projectResultPathOfMinio + projectId + "/" + taskId).build();
|
|
|
|
- taskEntity.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
|
- taskEntity.setCreateUserId(userId);
|
|
|
|
- taskEntity.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
|
- taskEntity.setModifyUserId(userId);
|
|
|
|
- taskEntity.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
|
- taskEntity.setIsDeleted("0");
|
|
|
|
- taskList.add(taskEntity);
|
|
|
|
- // 将 xosc、xodr、osgb 全部上传到仿真结果路径
|
|
|
|
- String scenarioOsc = sceneEntity.getScenarioOsc();
|
|
|
|
- String[] splitXosc = scenarioOsc.split("/");
|
|
|
|
- String xoscName = splitXosc[splitXosc.length - 1];
|
|
|
|
- String[] xoscNameSplit = xoscName.split("\\.");
|
|
|
|
- String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
|
|
|
|
- String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
|
|
|
|
- String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xoscSuffix;
|
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
|
|
|
|
- MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
|
|
|
|
-
|
|
|
|
- String scenarioOdr = sceneEntity.getScenarioOdr();
|
|
|
|
- String[] splitXodr = scenarioOdr.split("/");
|
|
|
|
- String xodrName = splitXodr[splitXodr.length - 1];
|
|
|
|
- String[] xodrNameSplit = xodrName.split("\\.");
|
|
|
|
- String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
|
|
|
|
- String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
|
|
|
|
- String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + xodrSuffix;
|
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
|
|
|
|
- MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
|
|
|
|
-
|
|
|
|
- String scenarioOsgb = sceneEntity.getScenarioOsgb();
|
|
|
|
- String[] splitOsgb = scenarioOsgb.split("/");
|
|
|
|
- String osgbName = splitOsgb[splitOsgb.length - 1];
|
|
|
|
- String[] osgbNameSplit = osgbName.split("\\.");
|
|
|
|
- String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
|
|
|
|
- String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
|
|
|
|
- String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + "." + osgbSuffix;
|
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
|
|
|
|
- MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
|
|
|
|
- log.info("已经将 xosc、xodr、osgb 上传到 minio 的结果文件目录:" + projectResultPathOfMinio);
|
|
|
|
-
|
|
|
|
- // 组装 task 消息
|
|
|
|
- // carsim 不需要查询模型参数
|
|
|
|
- TaskMessageEntity taskMessageEntity = TaskMessageEntity.builder().info(InfoEntity.builder().project_id(taskEntity.getPId()).task_id(taskEntity.getId()).task_path(taskEntity.getRunResultFilePath()).default_time(videoTime).build()).scenario(ScenarioEntity.builder().scenario_osc(xoscPathOfMinio).scenario_odr(xodrPathOfMinio).scenario_osgb(osgbPathOfMinio).build()).vehicle(VehicleTO.builder().model(ModelEntity.builder().model_label(vehicleEntity.getModelLabel()).build()).dynamics(null).sensors(SensorsEntity.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
|
- .camera(cameraEntityList).OGT(ogtEntityList).build()).build()).build();
|
|
|
|
-
|
|
|
|
- FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskMessageEntity), projectPath + taskId + ".json");
|
|
|
|
- log.info("项目 " + projectId + " 将任务消息转成 json 保存到临时目录等待资源分配后执行:" + taskMessageEntity.getInfo().getTask_id());
|
|
|
|
|
|
+ //3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
|
|
+ UserEntity userEntity = userMapper.selectById(userId);
|
|
|
|
+ log.info("项目 " + projectId + " 的创建人为:" + userEntity);
|
|
|
|
+ String roleCode = userEntity.getRoleCode();
|
|
|
|
+ String useType = userEntity.getUseType();
|
|
|
|
+ ClusterEntity clusterEntity;
|
|
|
|
+ if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
|
|
+ log.info("项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
|
|
+ PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
|
+ run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
|
+ return;
|
|
|
|
+ } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
|
|
+ clusterEntity = clusterMapper.selectByUserId(userId);
|
|
|
|
+ log.info("项目 " + projectId + " 的创建人 " + userId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterEntity);
|
|
|
|
+ } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
|
|
|
|
+ if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) { //3-3 普通子账户,根据自己的独占节点排队
|
|
|
|
+ clusterEntity = clusterMapper.selectByUserId(userId);
|
|
|
|
+ log.info("项目 " + projectId + " 的创建人 " + userId + " 为普通独占子账户(自己的集群),集群为:" + clusterEntity);
|
|
|
|
+ } else { //3-4 共享子账户,根据父账户的共享节点排队
|
|
|
|
+ String parentUserId = userEntity.getCreateUserId();
|
|
|
|
+ clusterEntity = clusterMapper.selectByUserId(parentUserId);
|
|
|
|
+ log.info("项目 " + projectId + " 的创建人 " + userId + " 为普通共享子账户(父账户的集群),集群为:" + clusterEntity);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ log.error("项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ // 获取拥有的节点数量,即仿真软件证书数量
|
|
|
|
+ String clusterId = clusterEntity.getId();
|
|
|
|
+ int simulationLicenseNumber = clusterEntity.getNumSimulationLicense();
|
|
|
|
+ // 获取该集群中正在运行的项目,如果没有则立即执行
|
|
|
|
+ PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
|
|
|
|
+ // 获取正在运行的项目的并行度总和
|
|
|
|
+ int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
|
|
|
|
+ // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
|
|
|
|
+ if (currentParallelismSum + parallelism <= simulationLicenseNumber) {
|
|
|
|
+ run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
|
+ } else {
|
|
|
|
+ log.info("项目 " + projectId + " 并行度超出账户允许,加入等待队列,暂不执行。 ");
|
|
|
|
+ wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
|
|
}
|
|
}
|
|
- }
|
|
|
|
- taskManager.batchInsertTask(taskList);
|
|
|
|
- log.info("项目 " + projectId + " 共有 " + taskList.size() + " 个任务,已保存到数据库");
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- //* -------------------------------- 4 开始排队 --------------------------------
|
|
|
|
- cacheProject(projectMessageDTO);
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 任务运行前首先判断用户是否拥有可分配资源
|
|
|
|
- *
|
|
|
|
- * @param projectMessageDTO 项目启动消息
|
|
|
|
- */
|
|
|
|
- @SneakyThrows
|
|
|
|
- public void cacheProject(ProjectMessageDTO projectMessageDTO) {
|
|
|
|
- log.info("判断用户是否拥有可分配资源:" + projectMessageDTO);
|
|
|
|
- //1 读取 kafka 的 project 信息
|
|
|
|
- String projectId = projectMessageDTO.getProjectId(); // 手动执行项目 id 或 自动执行子项目 id
|
|
|
|
- long parallelism = projectMessageDTO.getParallelism(); // 项目并行度
|
|
|
|
- String projectType = projectMessageDTO.getType(); // 项目类型
|
|
|
|
- //2 根据 projectId 获取创建用户 id
|
|
|
|
- String userId;
|
|
|
|
- if (DictConstants.PROJECT_TYPE_MANUAL.equals(projectType)) {
|
|
|
|
- userId = manualProjectMapper.selectCreateUserById(projectId);
|
|
|
|
- } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(projectType)) {
|
|
|
|
- userId = autoSubProjectMapper.selectCreateUserById(projectId);
|
|
|
|
- } else {
|
|
|
|
- log.error("项目类型错误:" + projectMessageDTO);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- if (StringUtil.isEmpty(userId)) {
|
|
|
|
- log.error("未查询到项目创建人:" + projectMessageDTO);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- //3 获取用户类型(管理员账户、管理员子账户、普通账户、普通子账户)(独占、共享)
|
|
|
|
- UserEntity userEntity = userMapper.selectById(userId);
|
|
|
|
- log.info("项目 " + projectId + " 的创建人为:" + userEntity);
|
|
|
|
- String roleCode = userEntity.getRoleCode();
|
|
|
|
- String useType = userEntity.getUseType();
|
|
|
|
- ClusterEntity clusterEntity;
|
|
|
|
- if (DictConstants.ROLE_CODE_SYSADMIN.equals(roleCode) || DictConstants.ROLE_CODE_ADMIN.equals(roleCode)) { //3-1 管理员账户和管理员子账户直接执行
|
|
|
|
- log.info("项目 " + projectId + " 的创建人 " + userId + " 为管理员账户或管理员子账户,直接判断服务器能否执行。");
|
|
|
|
- PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(DictConstants.SYSTEM_CLUSTER_ID, projectId);
|
|
|
|
- run(projectMessageDTO, DictConstants.SYSTEM_CLUSTER_ID, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
|
- return;
|
|
|
|
- } else if (DictConstants.ROLE_CODE_UESR.equals(roleCode)) { //3-2 普通账户,不管是独占还是共享,都在自己的集群里排队,根据自己的独占节点排队
|
|
|
|
- clusterEntity = clusterMapper.selectByUserId(userId);
|
|
|
|
- log.info("项目 " + projectId + " 的创建人 " + userId + " 为普通账户(包括独占或共享都在自己的集群),集群为:" + clusterEntity);
|
|
|
|
- } else if (DictConstants.ROLE_CODE_SUBUESR.equals(roleCode)) {
|
|
|
|
- if (DictConstants.USER_TYPE_EXCLUSIVE.equals(useType)) { //3-3 普通子账户,根据自己的独占节点排队
|
|
|
|
- clusterEntity = clusterMapper.selectByUserId(userId);
|
|
|
|
- log.info("项目 " + projectId + " 的创建人 " + userId + " 为普通独占子账户(自己的集群),集群为:" + clusterEntity);
|
|
|
|
- } else { //3-4 共享子账户,根据父账户的共享节点排队
|
|
|
|
- String parentUserId = userEntity.getCreateUserId();
|
|
|
|
- clusterEntity = clusterMapper.selectByUserId(parentUserId);
|
|
|
|
- log.info("项目 " + projectId + " 的创建人 " + userId + " 为普通共享子账户(父账户的集群),集群为:" + clusterEntity);
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- log.error("项目 " + projectId + " 的创建人 " + userId + " 为未知账户类型,不予执行!");
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- // 获取拥有的节点数量,即仿真软件证书数量
|
|
|
|
- String clusterId = clusterEntity.getId();
|
|
|
|
- int simulationLicenseNumber = clusterEntity.getNumSimulationLicense();
|
|
|
|
- // 获取该集群中正在运行的项目,如果没有则立即执行
|
|
|
|
- PrefixEntity redisPrefix = projectUtil.getRedisPrefixByClusterIdAndProjectId(clusterId, projectId);
|
|
|
|
- // 获取正在运行的项目的并行度总和
|
|
|
|
- int currentParallelismSum = projectUtil.getCurrentParallelismSum(redisPrefix.getClusterRunningPrefix());
|
|
|
|
- // 如果执行后的并行度总和小于最大节点数则执行,否则不执行
|
|
|
|
- if (currentParallelismSum + parallelism <= simulationLicenseNumber) {
|
|
|
|
- run(projectMessageDTO, clusterId, redisPrefix.getProjectRunningKey(), redisPrefix.getProjectWaitingKey());
|
|
|
|
- } else {
|
|
|
|
- log.info("项目 " + projectId + " 并行度超出账户允许,加入等待队列,暂不执行。 ");
|
|
|
|
- wait(redisPrefix.getProjectWaitingKey(), projectMessageDTO);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- //* -------------------------------- 运行 --------------------------------
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * @param projectMessageDTO 初始接收到的项目启动信息
|
|
|
|
- * @param clusterId 集群 id
|
|
|
|
- * @param projectRunningKey projectRunningKey
|
|
|
|
- * @param projectWaitingKey projectWaitingKey
|
|
|
|
- */
|
|
|
|
- public void run(ProjectMessageDTO projectMessageDTO, String clusterId, String projectRunningKey, String projectWaitingKey) {
|
|
|
|
-
|
|
|
|
- String projectId = projectMessageDTO.getProjectId();
|
|
|
|
- int parallelism = projectMessageDTO.getParallelism(); // 期望并行度
|
|
|
|
- //1 获取集群剩余可用并行度
|
|
|
|
- int restParallelism = projectUtil.getRestParallelism();
|
|
|
|
- //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
|
|
|
|
- if (restParallelism > 0L) {
|
|
|
|
- log.info("集群 " + clusterId + " 执行项目 " + projectId);
|
|
|
|
- // 设置实际的并行度
|
|
|
|
- projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism)); // 设置实际的并行度
|
|
|
|
- parseProject(projectMessageDTO, projectRunningKey);
|
|
|
|
- } else {
|
|
|
|
- log.info("服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
|
|
|
|
- wait(projectWaitingKey, projectMessageDTO);
|
|
|
|
|
|
+ //* -------------------------------- 运行 --------------------------------
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * @param projectMessageDTO 初始接收到的项目启动信息
|
|
|
|
+ * @param clusterId 集群 id
|
|
|
|
+ * @param projectRunningKey projectRunningKey
|
|
|
|
+ * @param projectWaitingKey projectWaitingKey
|
|
|
|
+ */
|
|
|
|
+ public void run(ProjectMessageDTO projectMessageDTO, String clusterId, String projectRunningKey, String projectWaitingKey) {
|
|
|
|
+
|
|
|
|
+ String projectId = projectMessageDTO.getProjectId();
|
|
|
|
+ int parallelism = projectMessageDTO.getParallelism(); // 期望并行度
|
|
|
|
+ //1 获取集群剩余可用并行度
|
|
|
|
+ int restParallelism = projectUtil.getRestParallelism();
|
|
|
|
+ //2 判断剩余可用并行度是否大于项目并行度,否则加入扩充队列
|
|
|
|
+ if (restParallelism > 0L) {
|
|
|
|
+ log.info("集群 " + clusterId + " 执行项目 " + projectId);
|
|
|
|
+ // 设置实际的并行度
|
|
|
|
+ projectMessageDTO.setCurrentParallelism(Math.min(restParallelism, parallelism)); // 设置实际的并行度
|
|
|
|
+ parseProject(projectMessageDTO, projectRunningKey);
|
|
|
|
+ } else {
|
|
|
|
+ log.info("服务器资源不够,项目 " + projectId + " 暂时加入等待队列。");
|
|
|
|
+ wait(projectWaitingKey, projectMessageDTO);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * @param projectMessageDTO 初始接收到的项目启动信息
|
|
|
|
- * @param projectRunningKey projectRunningKey
|
|
|
|
- */
|
|
|
|
- @SneakyThrows
|
|
|
|
- public void parseProject(ProjectMessageDTO projectMessageDTO, String projectRunningKey) {
|
|
|
|
- String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
|
|
- String modelType = projectMessageDTO.getModelType();
|
|
|
|
- String vehicleConfigId = projectMessageDTO.getVehicleConfigId();
|
|
|
|
- ProjectEntity projectEntity = projectUtil.getProjectByProjectId(projectId);
|
|
|
|
- String isChoiceGpu = projectEntity.getIsChoiceGpu();
|
|
|
|
- int currentParallelism = projectMessageDTO.getCurrentParallelism(); // 当前并行度
|
|
|
|
- String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
|
|
- String projectPath = linuxTempPath + "project/" + projectId + "/";
|
|
|
|
- // -------------------------------- 1 获取任务 json 列表 --------------------------------
|
|
|
|
- List<String> taskJsonList = FileUtil.listAbsolutePathByTypeAndLength(projectPath, "json", 37);
|
|
|
|
- int taskTotal = taskJsonList.size();
|
|
|
|
- projectMessageDTO.setTaskTotal(taskTotal);
|
|
|
|
- projectMessageDTO.setTaskCompleted(0);
|
|
|
|
- // 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
|
|
|
|
- //1 获取剩余并行度和即将使用的各node的并行度
|
|
|
|
- Map<String, Integer> nodeMap0 = projectUtil.getNodeMap();
|
|
|
|
- Map<String, Integer> nodeMap = projectUtil.getNodeMapToUse(Math.min(currentParallelism, taskTotal));
|
|
|
|
- //2 将指定 node 的并行度减少
|
|
|
|
- nodeMap.keySet().forEach(nodeName -> projectUtil.decrementParallelismOfGpuNode(nodeName, nodeMap.get(nodeName)));
|
|
|
|
- // 重新设置实际使用的并行度并保存到 redis
|
|
|
|
- int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
|
|
|
|
- projectMessageDTO.setCurrentParallelism(realCurrentParallelism);
|
|
|
|
- log.info("项目 " + projectId + " 运行在:" + nodeMap);
|
|
|
|
- stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
|
|
|
|
- //* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
|
|
|
|
- String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
|
|
|
|
- // -------------------------------- 4 发送任务消息 --------------------------------
|
|
|
|
- List<NodeEntity> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
|
|
|
|
- int messageNumber = 0;
|
|
|
|
- ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1); // 创建主题
|
|
|
|
- TimeUnit.SECONDS.sleep(7);
|
|
|
|
- // 需要即时启动的任务(并行度的大小)
|
|
|
|
- CopyOnWriteArrayList<String> yamlToRunRedisKeyList = new CopyOnWriteArrayList<>();
|
|
|
|
- for (String taskJsonPath : taskJsonList) {
|
|
|
|
- String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
|
|
|
|
- // 保存运行中的任务信息
|
|
|
|
- String taskMessageKey = projectRunningKey + ":task:" + taskId + ":message";
|
|
|
|
- String taskJson = FileUtil.read(taskJsonPath);
|
|
|
|
- stringRedisTemplate.opsForValue().set(taskMessageKey, taskJson);
|
|
|
|
-
|
|
|
|
- //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
|
|
|
|
- SendResult<String, String> stringStringSendResult = kafkaTemplate.send(projectId, messageNumber % currentParallelism, taskId, taskJson).get();
|
|
|
|
- RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
|
|
|
|
- String topic = recordMetadata.topic(); // 消息发送到的topic
|
|
|
|
- int partition = recordMetadata.partition(); // 消息发送到的分区
|
|
|
|
- long offset = recordMetadata.offset(); // 消息在分区内的offset
|
|
|
|
- log.info("发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
|
|
|
|
- //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
|
|
|
|
- // 选一个 count 最少的 node
|
|
|
|
- String currentNodeName = "";
|
|
|
|
- NodeEntity currentNodeEntity = null;
|
|
|
|
- int currentCount = Integer.MAX_VALUE;
|
|
|
|
- log.info("各节点已经预定的任务个数为:" + nodeListToCount);
|
|
|
|
- for (NodeEntity nodeEntity : nodeListToCount) {
|
|
|
|
- int tempCount = nodeEntity.getCount();
|
|
|
|
- String tempNodeName = nodeEntity.getNodeName();
|
|
|
|
- if (tempCount < currentCount) {
|
|
|
|
- currentCount = tempCount;
|
|
|
|
- currentNodeName = tempNodeName;
|
|
|
|
- currentNodeEntity = nodeEntity;
|
|
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * @param projectMessageDTO 初始接收到的项目启动信息
|
|
|
|
+ * @param projectRunningKey projectRunningKey
|
|
|
|
+ */
|
|
|
|
+ @SneakyThrows
|
|
|
|
+ public void parseProject(ProjectMessageDTO projectMessageDTO, String projectRunningKey) {
|
|
|
|
+ String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
|
|
+ String modelType = projectMessageDTO.getModelType();
|
|
|
|
+ String vehicleConfigId = projectMessageDTO.getVehicleConfigId();
|
|
|
|
+ ProjectEntity projectEntity = projectUtil.getProjectByProjectId(projectId);
|
|
|
|
+ String isChoiceGpu = projectEntity.getIsChoiceGpu();
|
|
|
|
+ int currentParallelism = projectMessageDTO.getCurrentParallelism(); // 当前并行度
|
|
|
|
+ String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
|
|
+ String projectPath = linuxTempPath + "project/" + projectId + "/";
|
|
|
|
+ // -------------------------------- 1 获取任务 json 列表 --------------------------------
|
|
|
|
+ List<String> taskJsonList = FileUtil.listAbsolutePathByTypeAndLength(projectPath, "json", 37);
|
|
|
|
+ int taskTotal = taskJsonList.size();
|
|
|
|
+ projectMessageDTO.setTaskTotal(taskTotal);
|
|
|
|
+ projectMessageDTO.setTaskCompleted(0);
|
|
|
|
+ // 设置任务数量之后,获取运行节点,并将项目运行信息放入 redis
|
|
|
|
+ //1 获取剩余并行度和即将使用的各node的并行度
|
|
|
|
+ Map<String, Integer> nodeMap0 = projectUtil.getNodeMap();
|
|
|
|
+ Map<String, Integer> nodeMap = projectUtil.getNodeMapToUse(Math.min(currentParallelism, taskTotal));
|
|
|
|
+ //2 将指定 node 的并行度减少
|
|
|
|
+ nodeMap.keySet().forEach(nodeName -> projectUtil.decrementParallelismOfGpuNode(nodeName, nodeMap.get(nodeName)));
|
|
|
|
+ // 重新设置实际使用的并行度并保存到 redis
|
|
|
|
+ int realCurrentParallelism = nodeMap.values().stream().mapToInt(parallelism -> parallelism).sum();
|
|
|
|
+ projectMessageDTO.setCurrentParallelism(realCurrentParallelism);
|
|
|
|
+ log.info("项目 " + projectId + " 运行在:" + nodeMap);
|
|
|
|
+ stringRedisTemplate.opsForValue().set(projectRunningKey, JsonUtil.beanToJson(projectMessageDTO));
|
|
|
|
+ //* -------------------------------- 3 根据算法id查询算法名称 --------------------------------
|
|
|
|
+ String algorithmDockerImage = algorithmMapper.selectDockerImageById(algorithmId);
|
|
|
|
+ // -------------------------------- 4 发送任务消息 --------------------------------
|
|
|
|
+ List<NodeEntity> nodeListToCount = projectUtil.getNodeListToCount(nodeMap);
|
|
|
|
+ int messageNumber = 0;
|
|
|
|
+ ApacheKafkaUtil.createTopic(kafkaAdminClient, projectId, realCurrentParallelism, (short) 1); // 创建主题
|
|
|
|
+ TimeUnit.SECONDS.sleep(7);
|
|
|
|
+ // 需要即时启动的任务(并行度的大小)
|
|
|
|
+ CopyOnWriteArrayList<String> yamlToRunRedisKeyList = new CopyOnWriteArrayList<>();
|
|
|
|
+ for (String taskJsonPath : taskJsonList) {
|
|
|
|
+ String taskId = FileUtil.getFilenameWithoutSuffix(taskJsonPath);
|
|
|
|
+ // 保存运行中的任务信息
|
|
|
|
+ String taskMessageKey = projectRunningKey + ":task:" + taskId + ":message";
|
|
|
|
+ String taskJson = FileUtil.read(taskJsonPath);
|
|
|
|
+ stringRedisTemplate.opsForValue().set(taskMessageKey, taskJson);
|
|
|
|
+
|
|
|
|
+ //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
|
|
|
|
+ SendResult<String, String> stringStringSendResult = kafkaTemplate.send(projectId, messageNumber % currentParallelism, taskId, taskJson).get();
|
|
|
|
+ RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
|
|
|
|
+ String topic = recordMetadata.topic(); // 消息发送到的topic
|
|
|
|
+ int partition = recordMetadata.partition(); // 消息发送到的分区
|
|
|
|
+ long offset = recordMetadata.offset(); // 消息在分区内的offset
|
|
|
|
+ log.info("发送消息成功, 主题 topic 为:" + topic + " 分区 partition 为:" + partition + " 偏移量为:" + offset + " 消息体为:" + taskJson);
|
|
|
|
+ //4-6 发送成功过的同时创建 pod.yaml 文件并把文件地址存到 redis
|
|
|
|
+ // 选一个 count 最少的 node
|
|
|
|
+ String currentNodeName = "";
|
|
|
|
+ NodeEntity currentNodeEntity = null;
|
|
|
|
+ int currentCount = Integer.MAX_VALUE;
|
|
|
|
+ log.info("各节点已经预定的任务个数为:" + nodeListToCount);
|
|
|
|
+ for (NodeEntity nodeEntity : nodeListToCount) {
|
|
|
|
+ int tempCount = nodeEntity.getCount();
|
|
|
|
+ String tempNodeName = nodeEntity.getNodeName();
|
|
|
|
+ if (tempCount < currentCount) {
|
|
|
|
+ currentCount = tempCount;
|
|
|
|
+ currentNodeName = tempNodeName;
|
|
|
|
+ currentNodeEntity = nodeEntity;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (currentNodeEntity == null) {
|
|
|
|
+ String errorMessage = "挑选节点失败。";
|
|
|
|
+ log.info(errorMessage);
|
|
|
|
+ throw new RuntimeException(errorMessage);
|
|
|
|
+ }
|
|
|
|
+ currentNodeEntity.setCount(currentNodeEntity.getCount() + 1);
|
|
|
|
+ Integer cpuOrder = null;
|
|
|
|
+ if (currentCount == 0) {
|
|
|
|
+ // 根据各节点剩余并行度,倒序获取 cpu 编号
|
|
|
|
+ cpuOrder = nodeMap0.get(currentNodeName) - 1;
|
|
|
|
+ nodeMap0.put(currentNodeName, cpuOrder);
|
|
|
|
+ }
|
|
|
|
+ // 只有准备启动(即 currentCount == 0)的时候才指定 cpu 编号
|
|
|
|
+ log.info("创建任务 " + taskId + " 的 yaml:是否使用 gpu (0是1否)" + isChoiceGpu + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前节点名称为:" + currentNodeName + ",当前 cpu 编号为:" + cpuOrder);
|
|
|
|
+ String yamlRedisKey = projectManager.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder);
|
|
|
|
+
|
|
|
|
+ if (currentCount == 0) {
|
|
|
|
+ String podName = yamlRedisKey.split(":")[yamlRedisKey.split(":").length - 1];
|
|
|
|
+ log.info("将 pod 加入到启动列表 " + podName);
|
|
|
|
+ yamlToRunRedisKeyList.add(yamlRedisKey);
|
|
|
|
+ }
|
|
|
|
+ messageNumber++;
|
|
|
|
+ }
|
|
|
|
+ TimeUnit.SECONDS.sleep(6);
|
|
|
|
+ log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
|
|
|
|
+ for (String redisKey : yamlToRunRedisKeyList) {
|
|
|
|
+ projectUtil.createPodBegin(projectId, redisKey);
|
|
}
|
|
}
|
|
- }
|
|
|
|
- if (currentNodeEntity == null) {
|
|
|
|
- String errorMessage = "挑选节点失败。";
|
|
|
|
- log.info(errorMessage);
|
|
|
|
- throw new RuntimeException(errorMessage);
|
|
|
|
- }
|
|
|
|
- currentNodeEntity.setCount(currentNodeEntity.getCount() + 1);
|
|
|
|
- Integer cpuOrder = null;
|
|
|
|
- if (currentCount == 0) {
|
|
|
|
- // 根据各节点剩余并行度,倒序获取 cpu 编号
|
|
|
|
- cpuOrder = nodeMap0.get(currentNodeName) - 1;
|
|
|
|
- nodeMap0.put(currentNodeName, cpuOrder);
|
|
|
|
- }
|
|
|
|
- // 只有准备启动(即 currentCount == 0)的时候才指定 cpu 编号
|
|
|
|
- log.info("创建任务 " + taskId + " 的 yaml:是否使用 gpu (0是1否)" + isChoiceGpu + ",当前节点已创建 yaml 个数为:" + currentCount + ",当前节点名称为:" + currentNodeName + ",当前 cpu 编号为:" + cpuOrder);
|
|
|
|
- String yamlRedisKey = projectManager.createTempYaml(projectId, vehicleConfigId, modelType, algorithmDockerImage, currentNodeName, partition, offset, isChoiceGpu, cpuOrder);
|
|
|
|
-
|
|
|
|
- if (currentCount == 0) {
|
|
|
|
- String podName = yamlRedisKey.split(":")[yamlRedisKey.split(":").length - 1];
|
|
|
|
- log.info("将 pod 加入到启动列表 " + podName);
|
|
|
|
- yamlToRunRedisKeyList.add(yamlRedisKey);
|
|
|
|
- }
|
|
|
|
- messageNumber++;
|
|
|
|
|
|
+ log.info("项目 " + projectId + " 已经启动 " + yamlToRunRedisKeyList);
|
|
}
|
|
}
|
|
- TimeUnit.SECONDS.sleep(6);
|
|
|
|
- log.info("项目 " + projectId + " 共发送了 " + messageNumber + " 条消息,准备首先启动 " + yamlToRunRedisKeyList);
|
|
|
|
- for (String redisKey : yamlToRunRedisKeyList) {
|
|
|
|
- projectUtil.createPodBegin(projectId, redisKey);
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ //* -------------------------------- 等待 --------------------------------
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * @param projectWaitingKey 项目等待 key
|
|
|
|
+ * @param projectMessageDTO 项目信息
|
|
|
|
+ */
|
|
|
|
+ public void wait(String projectWaitingKey, ProjectMessageDTO projectMessageDTO) {
|
|
|
|
+ stringRedisTemplate.opsForValue().set(projectWaitingKey, JsonUtil.beanToJson(projectMessageDTO));
|
|
}
|
|
}
|
|
- log.info("项目 " + projectId + " 已经启动 " + yamlToRunRedisKeyList);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- //* -------------------------------- 等待 --------------------------------
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * @param projectWaitingKey 项目等待 key
|
|
|
|
- * @param projectMessageDTO 项目信息
|
|
|
|
- */
|
|
|
|
- public void wait(String projectWaitingKey, ProjectMessageDTO projectMessageDTO) {
|
|
|
|
- stringRedisTemplate.opsForValue().set(projectWaitingKey, JsonUtil.beanToJson(projectMessageDTO));
|
|
|
|
- }
|
|
|
|
- //* -------------------------------- 结束 --------------------------------
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * {
|
|
|
|
- * "projectId": "sadfasdfs", // 项目 id
|
|
|
|
- * "type": "1", // 项目类型
|
|
|
|
- * }
|
|
|
|
- */
|
|
|
|
- @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")
|
|
|
|
- @SneakyThrows
|
|
|
|
- public void stopProject(ConsumerRecord<String, String> stopRecord) {
|
|
|
|
- log.info("接收到的项目终止消息为:" + stopRecord);
|
|
|
|
- //1 读取 kafka 的项目停止信息
|
|
|
|
- String json = stopRecord.value();
|
|
|
|
- ObjectMapper objectMapper = new ObjectMapper();
|
|
|
|
- JsonNode jsonNode = objectMapper.readTree(json);
|
|
|
|
- // 将项目状态修改为终止中
|
|
|
|
- String projectId = jsonNode.path("projectId").asText();
|
|
|
|
- String type = jsonNode.path("type").asText();
|
|
|
|
- if (DictConstants.PROJECT_TYPE_MANUAL.equals(type)) {
|
|
|
|
- manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATING, TimeUtil.getNowForMysql());
|
|
|
|
- } else if (DictConstants.PROJECT_TYPE_AUTO_SUB.equals(type)) {
|
|
|
|
- autoSubProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATING, TimeUtil.getNowForMysql());
|
|
|
|
|
|
+ //* -------------------------------- 结束 --------------------------------
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * {
|
|
|
|
+ * "projectId": "sadfasdfs", // 项目 id
|
|
|
|
+ * "type": "1", // 项目类型
|
|
|
|
+ * }
|
|
|
|
+ */
|
|
|
|
+ @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.stop-topic}")
|
|
|
|
+ @SneakyThrows
|
|
|
|
+ public void stopProject(ConsumerRecord<String, String> stopRecord) {
|
|
|
|
+ log.info("接收到的项目终止消息为:" + stopRecord);
|
|
|
|
+ JsonNode jsonNode = new ObjectMapper().readTree(stopRecord.value());
|
|
|
|
+ String projectId = jsonNode.path("projectId").asText();
|
|
|
|
+ String type = jsonNode.path("type").asText();
|
|
|
|
+ projectService.stopProject(projectId, type);
|
|
}
|
|
}
|
|
- projectService.stopProject(projectId, type);
|
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
}
|