|
@@ -3,31 +3,30 @@ package com.css.simulation.resource.scheduler.consumer;
|
|
|
|
|
|
import api.common.pojo.constants.DictConstants;
|
|
|
import api.common.pojo.dto.ProjectMessageDTO;
|
|
|
-import api.common.pojo.param.KafkaParameter;
|
|
|
-import api.common.pojo.param.MinioParameter;
|
|
|
-import api.common.pojo.param.RedisParameter;
|
|
|
-import api.common.util.*;
|
|
|
-import com.css.simulation.resource.scheduler.feign.CommonService;
|
|
|
+import api.common.util.JsonUtil;
|
|
|
+import api.common.util.LinuxUtil;
|
|
|
+import api.common.util.StringUtil;
|
|
|
+import api.common.util.TimeUtil;
|
|
|
import com.css.simulation.resource.scheduler.mapper.*;
|
|
|
-import com.css.simulation.resource.scheduler.pojo.to.*;
|
|
|
import com.css.simulation.resource.scheduler.pojo.po.*;
|
|
|
-import feign.Response;
|
|
|
+import com.css.simulation.resource.scheduler.pojo.to.*;
|
|
|
+import com.css.simulation.resource.scheduler.util.MinioUtil;
|
|
|
import io.kubernetes.client.openapi.ApiClient;
|
|
|
-import io.kubernetes.client.openapi.ApiException;
|
|
|
import io.kubernetes.client.openapi.apis.BatchV1Api;
|
|
|
-import io.kubernetes.client.openapi.models.*;
|
|
|
+import io.kubernetes.client.openapi.models.V1Job;
|
|
|
import io.kubernetes.client.util.Yaml;
|
|
|
+import io.minio.MinioClient;
|
|
|
import lombok.SneakyThrows;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.data.redis.core.StringRedisTemplate;
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.ResourceUtils;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
-import java.io.InputStream;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
@@ -37,7 +36,14 @@ import java.util.List;
|
|
|
public class ManualProjectConsumer {
|
|
|
|
|
|
private static final String USER_ID = "simulation-resource-scheduler";
|
|
|
-
|
|
|
+ @Autowired
|
|
|
+ MinioClient minioClient;
|
|
|
+ @Value("${minio.bucket-name}")
|
|
|
+ String bucketName;
|
|
|
+ @Autowired
|
|
|
+ KafkaTemplate<String, String> kafkaTemplate;
|
|
|
+ @Autowired
|
|
|
+ StringRedisTemplate redisTemplate;
|
|
|
@Autowired
|
|
|
ProjectMapper projectMapper;
|
|
|
@Autowired
|
|
@@ -53,8 +59,6 @@ public class ManualProjectConsumer {
|
|
|
@Autowired
|
|
|
SensorOgtMapper sensorOgtMapper;
|
|
|
@Autowired
|
|
|
- CommonService commonService;
|
|
|
- @Autowired
|
|
|
AlgorithmMapper algorithmMapper;
|
|
|
@Autowired
|
|
|
ApiClient apiClient;
|
|
@@ -153,7 +157,7 @@ public class ManualProjectConsumer {
|
|
|
taskPO.setIsDeleted("0");
|
|
|
taskMapper.insert(taskPO);
|
|
|
// 心跳信息存在緩存中
|
|
|
- commonService.set(new RedisParameter(manualProjectTopic + ":" + projectId + ":" + taskId, TimeUtil.getNow() + "", 0));
|
|
|
+ redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId, TimeUtil.getNowString());
|
|
|
// 组装 task 消息
|
|
|
TaskTO taskTO = TaskTO.builder()
|
|
|
.info(InfoTO.builder()
|
|
@@ -199,198 +203,20 @@ public class ManualProjectConsumer {
|
|
|
//4-4 将对象转成 json
|
|
|
String taskJson = JsonUtil.beanToJson(taskTO);
|
|
|
//4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
|
|
|
- commonService.send(new KafkaParameter(projectId, taskJson));
|
|
|
+ kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
|
|
|
+ // 消息发送到的topic
|
|
|
+ String topic = success.getRecordMetadata().topic();
|
|
|
+ // 消息发送到的分区
|
|
|
+ int partition = success.getRecordMetadata().partition();
|
|
|
+ // 消息在分区内的offset
|
|
|
+ long offset = success.getRecordMetadata().offset();
|
|
|
+ log.info("发送消息成功:" + topic + "-" + partition + "-" + offset);
|
|
|
+ }, failure -> {
|
|
|
+ log.error("发送消息失败:" + failure.getMessage());
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
// -------------------------------- 4 算法导入(一期按单机版做) --------------------------------
|
|
|
-// // 私有仓库导入算法镜像(搭建私有仓库)
|
|
|
-// String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
|
-// //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
|
|
|
-// AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
|
|
|
-// String minioPath = algorithmPO.getMinioPath();
|
|
|
-// String dockerImage;
|
|
|
-// if ("0".equals(algorithmPO.getDockerImport())) {
|
|
|
-// dockerImage = "algorithm_" + algorithmId + ":latest";
|
|
|
-// String algorithmTarLinuxTempPath = linuxTempPath + minioPath;
|
|
|
-// // 下载算法文件到本地( 2 到仓库服务器)
|
|
|
-// Response response = commonService.download(new MinioParameter(minioPath));
|
|
|
-// InputStream inputStream = response.body().asInputStream();
|
|
|
-// FileUtil.writeInputStreamToLocalFile(inputStream, algorithmTarLinuxTempPath);
|
|
|
-// //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
|
|
|
-// LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
|
|
|
-// } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
|
|
|
-// dockerImage = algorithmPO.getDockerImage();
|
|
|
-// } else {
|
|
|
-// throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
|
|
|
-// }
|
|
|
- // -------------------------------- 5 创建 pod 开始执行 --------------------------------
|
|
|
- int completions = sceneList.size(); // 结束标
|
|
|
- int parallelism = projectMessageDTO.getParallelism(); // 并行度
|
|
|
- BatchV1Api batchV1Api = new BatchV1Api(apiClient);
|
|
|
-// V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
|
|
|
- V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-test.yaml"));
|
|
|
- //1 apiVersion
|
|
|
- //2 kind
|
|
|
- //3 metadata
|
|
|
- V1ObjectMeta metadata = yaml.getMetadata();
|
|
|
- metadata.setName("project_" + projectId);
|
|
|
- yaml.setMetadata(metadata);
|
|
|
- //4 job
|
|
|
- V1JobSpec job = yaml.getSpec();
|
|
|
- job.setCompletions(completions); // 这个标准是什么?
|
|
|
- job.setParallelism(parallelism);
|
|
|
- //5 pod
|
|
|
- V1PodSpec v1PodSpec = job.getTemplate().getSpec();
|
|
|
- //6 container
|
|
|
- List<V1Container> containers = v1PodSpec.getContainers();
|
|
|
- for (V1Container container : containers) {
|
|
|
- String name = container.getName();
|
|
|
- if ("vtd".equals(name)) {
|
|
|
- container.setName("vtd_" + projectId);
|
|
|
- }
|
|
|
- if ("algorithm".equals(name)) {
|
|
|
- container.setName("algorithm_" + projectId);
|
|
|
-// container.setImage(dockerImage);
|
|
|
- }
|
|
|
- }
|
|
|
- //4-4 创建
|
|
|
- yaml.setSpec(job);
|
|
|
- batchV1Api.createNamespacedJob("simulation-cloud", yaml, null, null, null);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- // public void testParseProject(ConsumerRecord<String, String> projectRecord) throws IOException, ApiException {
|
|
|
- public void testParseProject(String projectJson) throws IOException, ApiException {
|
|
|
-// System.out.println("------- 接收到消息为:" + projectRecord);
|
|
|
- System.out.println("------- 接收到消息为:" + projectJson);
|
|
|
- //1 读取 kafka 的 project 信息
|
|
|
- /*
|
|
|
- {
|
|
|
- "projectId": "sadfasdfs", // 项目 id
|
|
|
- "algorithmId": "sadfasdfs", // 算法 id
|
|
|
- "vehicleConfigId": "sadfasdfs", // 车辆 id
|
|
|
- "scenePackageId": "sadfasdfs", // 场景包 id
|
|
|
- "maxSimulationTime": 11111, // 最大仿真时间
|
|
|
- "parallelism": 30 // 并行度
|
|
|
- }
|
|
|
- */
|
|
|
-// String projectJson = projectRecord.value();
|
|
|
- ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
|
- String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
|
- projectMapper.updateProjectState(projectId, "20"); // 修改该 project 的状态为执行中
|
|
|
-
|
|
|
-
|
|
|
- // -------------------------------- 1 场景 --------------------------------
|
|
|
- // 根据 packageId,拿到场景 id,然后获取每个场景的文件地址。
|
|
|
- String packageId = projectMessageDTO.getScenePackageId();
|
|
|
- List<IndexTemplatePO> leafIndexList = indexTemplateMapper.selectLeafIndexByPackageId(packageId);
|
|
|
- List<String> naturalIdList = new ArrayList<>();
|
|
|
- List<String> standardIdList = new ArrayList<>();
|
|
|
- List<String> accidentIdList = new ArrayList<>();
|
|
|
- for (IndexTemplatePO indexTemplatePO : leafIndexList) {
|
|
|
- String naturalIds = indexTemplatePO.getSceneNaturalIds();
|
|
|
- String standardIds = indexTemplatePO.getSceneStatueIds();
|
|
|
- String accidentIds = indexTemplatePO.getSceneTrafficIds();
|
|
|
- if (StringUtil.isNotEmpty(naturalIds)) {
|
|
|
- String[] naturalIdArray = naturalIds.split(",");
|
|
|
- naturalIdList.addAll(Arrays.asList(naturalIdArray));
|
|
|
- }
|
|
|
- if (StringUtil.isNotEmpty(standardIds)) {
|
|
|
- String[] standardArray = standardIds.split(",");
|
|
|
- standardIdList.addAll(Arrays.asList(standardArray));
|
|
|
- }
|
|
|
- if (StringUtil.isNotEmpty(accidentIds)) {
|
|
|
- String[] accidentIdArray = accidentIds.split(",");
|
|
|
- accidentIdList.addAll(Arrays.asList(accidentIdArray));
|
|
|
- }
|
|
|
- }
|
|
|
- List<ScenePO> sceneList = new ArrayList<>();
|
|
|
- sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
|
|
|
- sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
|
|
|
- sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
|
|
|
- projectMapper.updateTaskNumber(projectId, sceneList.size()); // 有多少场景就有多少任务
|
|
|
- // -------------------------------- 2 模型 --------------------------------
|
|
|
- // 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
|
- String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
|
|
|
- VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 模型
|
|
|
- List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
- List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
-
|
|
|
- // -------------------------------- 3 任务消息 --------------------------------
|
|
|
- // 根据场景创建任务,组装 task 消息
|
|
|
- int maxSimulationTime = projectMessageDTO.getMaxSimulationTime();
|
|
|
-
|
|
|
- for (ScenePO scenePO : sceneList) {
|
|
|
- String taskId = StringUtil.getRandomUUID();
|
|
|
- String resultPath = linuxTempPath + projectId + "/" + taskId;
|
|
|
- // 保存任务信息
|
|
|
- TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
|
|
|
- .id(taskId)
|
|
|
- .pId(projectId)
|
|
|
- .sceneId(scenePO.getId())
|
|
|
- .sceneName(scenePO.getName())
|
|
|
- .sceneType(scenePO.getType())
|
|
|
- .runState(DictConstants.TASK_PENDING)
|
|
|
- .runResult(resultPath)
|
|
|
- .build();
|
|
|
- taskPO.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
- taskPO.setCreateUserId(USER_ID);
|
|
|
- taskPO.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
- taskPO.setModifyUserId(USER_ID);
|
|
|
- taskPO.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
- taskPO.setIsDeleted("0");
|
|
|
- taskMapper.insert(taskPO);
|
|
|
- // 心跳信息存在緩存中
|
|
|
- commonService.set(new RedisParameter(manualProjectTopic + ":" + projectId + ":" + taskId, TimeUtil.getNow() + "", 0));
|
|
|
- // 组装 task 消息
|
|
|
- TaskTO taskTO = TaskTO.builder()
|
|
|
- .info(InfoTO.builder()
|
|
|
- .project_id(projectId)
|
|
|
- .task_id(taskId)
|
|
|
- .task_path(resultPath)
|
|
|
- .default_time(maxSimulationTime)
|
|
|
- .build())
|
|
|
- .scenario(ScenarioTO.builder()
|
|
|
- .scenario_osc(scenePO.getScenarioOsc())
|
|
|
- .scenario_odr(scenePO.getScenarioOdr())
|
|
|
- .scenario_osgb(scenePO.getScenarioOsgb())
|
|
|
- .build())
|
|
|
- .vehicle(VehicleTO.builder()
|
|
|
- .model(ModelTO.builder()
|
|
|
- .model_label(vehiclePO.getModelLabel())
|
|
|
- .build())
|
|
|
- .dynamics(DynamicsTO.builder()
|
|
|
- .dynamics_maxspeed(vehiclePO.getMaxSpeed())
|
|
|
- .dynamics_enginepower(vehiclePO.getEnginePower())
|
|
|
- .dynamics_maxdecel(vehiclePO.getMaxDeceleration())
|
|
|
- .dynamics_maxsteering(vehiclePO.getMaxSteeringAngle())
|
|
|
- .dynamics_mass(vehiclePO.getMass())
|
|
|
- .dynamics_frontsurfaceeffective(vehiclePO.getFrontSurfaceEffective())
|
|
|
- .dynamics_airdragcoefficient(vehiclePO.getAirDragCoefficient())
|
|
|
- .dynamics_rollingresistance(vehiclePO.getRollingResistanceCoefficient())
|
|
|
- .dynamics_wheeldiameter(vehiclePO.getWheelDiameter())
|
|
|
- .dynamics_wheeldrive(vehiclePO.getWheelDrive())
|
|
|
- .dynamics_overallefficiency(vehiclePO.getOverallEfficiency())
|
|
|
- .dynamics_distfront(vehiclePO.getFrontDistance())
|
|
|
- .dynamics_distrear(vehiclePO.getRearDistance())
|
|
|
- .dynamics_distleft(vehiclePO.getLeftDistance())
|
|
|
- .dynamics_distright(vehiclePO.getRightDistance())
|
|
|
- .dynamics_distheight(vehiclePO.getHeightDistance())
|
|
|
- .dynamics_wheelbase(vehiclePO.getWheelbase())
|
|
|
- .build())
|
|
|
- .sensors(SensorsTO.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
- .camera(cameraPOList)
|
|
|
- .OGT(ogtPOList)
|
|
|
- .build())
|
|
|
- .build())
|
|
|
- .build();
|
|
|
- //4-4 将对象转成 json
|
|
|
- String taskJson = JsonUtil.beanToJson(taskTO);
|
|
|
- //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
|
|
|
- commonService.send(new KafkaParameter(projectId, taskJson));
|
|
|
- }
|
|
|
-
|
|
|
- // -------------------------------- 4 算法(一期按单机版做) --------------------------------
|
|
|
// 私有仓库导入算法镜像(搭建私有仓库)
|
|
|
String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
|
//4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
|
|
@@ -401,9 +227,7 @@ public class ManualProjectConsumer {
|
|
|
dockerImage = "algorithm_" + algorithmId + ":latest";
|
|
|
String algorithmTarLinuxTempPath = linuxTempPath + minioPath;
|
|
|
// 下载算法文件到本地( 2 到仓库服务器)
|
|
|
- Response response = commonService.download(new MinioParameter(minioPath));
|
|
|
- InputStream inputStream = response.body().asInputStream();
|
|
|
- FileUtil.writeInputStreamToLocalFile(inputStream, algorithmTarLinuxTempPath);
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, minioPath, algorithmTarLinuxTempPath);
|
|
|
//4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
|
|
|
LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
|
|
|
} else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
|
|
@@ -415,36 +239,224 @@ public class ManualProjectConsumer {
|
|
|
int completions = sceneList.size(); // 结束标
|
|
|
int parallelism = projectMessageDTO.getParallelism(); // 并行度
|
|
|
BatchV1Api batchV1Api = new BatchV1Api(apiClient);
|
|
|
- V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
|
|
|
- //1 apiVersion
|
|
|
- //2 kind
|
|
|
- //3 metadata
|
|
|
- V1ObjectMeta metadata = yaml.getMetadata();
|
|
|
- metadata.setName("project_" + projectId);
|
|
|
- yaml.setMetadata(metadata);
|
|
|
- //4 job
|
|
|
- V1JobSpec job = yaml.getSpec();
|
|
|
- job.setCompletions(completions); // 这个标准是什么?
|
|
|
- job.setParallelism(parallelism);
|
|
|
- //5 pod
|
|
|
- V1PodSpec v1PodSpec = job.getTemplate().getSpec();
|
|
|
- //6 container
|
|
|
- List<V1Container> containers = v1PodSpec.getContainers();
|
|
|
- for (V1Container container : containers) {
|
|
|
- String name = container.getName();
|
|
|
- if ("vtd".equals(name)) {
|
|
|
- container.setName("vtd_" + projectId);
|
|
|
- }
|
|
|
- if ("algorithm".equals(name)) {
|
|
|
- container.setName("algorithm_" + projectId);
|
|
|
- container.setImage(dockerImage);
|
|
|
- }
|
|
|
- }
|
|
|
- //4-4 创建
|
|
|
- yaml.setSpec(job);
|
|
|
- batchV1Api.createNamespacedJob("simulation-task", yaml, null, null, null);
|
|
|
-
|
|
|
+// V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
|
|
|
+ V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-test.yaml"));
|
|
|
+// //1 apiVersion
|
|
|
+// //2 kind
|
|
|
+// //3 metadata
|
|
|
+// V1ObjectMeta metadata = yaml.getMetadata();
|
|
|
+// metadata.setName("project_" + projectId);
|
|
|
+// yaml.setMetadata(metadata);
|
|
|
+// //4 job
|
|
|
+// V1JobSpec job = yaml.getSpec();
|
|
|
+// job.setCompletions(completions); // 这个标准是什么?
|
|
|
+// job.setParallelism(parallelism);
|
|
|
+// //5 pod
|
|
|
+// V1PodSpec v1PodSpec = job.getTemplate().getSpec();
|
|
|
+// //6 container
|
|
|
+// List<V1Container> containers = v1PodSpec.getContainers();
|
|
|
+// for (V1Container container : containers) {
|
|
|
+// String name = container.getName();
|
|
|
+// if ("vtd".equals(name)) {
|
|
|
+// container.setName("vtd_" + projectId);
|
|
|
+// }
|
|
|
+// if ("algorithm".equals(name)) {
|
|
|
+// container.setName("algorithm_" + projectId);
|
|
|
+//// container.setImage(dockerImage);
|
|
|
+// }
|
|
|
+// }
|
|
|
+// //4-4 创建
|
|
|
+// yaml.setSpec(job);
|
|
|
+ batchV1Api.createNamespacedJob("simulation-cloud", yaml, null, null, null);
|
|
|
}
|
|
|
|
|
|
|
|
|
+// // public void testParseProject(ConsumerRecord<String, String> projectRecord) throws IOException, ApiException {
|
|
|
+// public void testParseProject(String projectJson) throws IOException, ApiException {
|
|
|
+//// System.out.println("------- 接收到消息为:" + projectRecord);
|
|
|
+// System.out.println("------- 接收到消息为:" + projectJson);
|
|
|
+// //1 读取 kafka 的 project 信息
|
|
|
+// /*
|
|
|
+// {
|
|
|
+// "projectId": "sadfasdfs", // 项目 id
|
|
|
+// "algorithmId": "sadfasdfs", // 算法 id
|
|
|
+// "vehicleConfigId": "sadfasdfs", // 车辆 id
|
|
|
+// "scenePackageId": "sadfasdfs", // 场景包 id
|
|
|
+// "maxSimulationTime": 11111, // 最大仿真时间
|
|
|
+// "parallelism": 30 // 并行度
|
|
|
+// }
|
|
|
+// */
|
|
|
+//// String projectJson = projectRecord.value();
|
|
|
+// ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
|
+// String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
|
+// projectMapper.updateProjectState(projectId, "20"); // 修改该 project 的状态为执行中
|
|
|
+//
|
|
|
+//
|
|
|
+// // -------------------------------- 1 场景 --------------------------------
|
|
|
+// // 根据 packageId,拿到场景 id,然后获取每个场景的文件地址。
|
|
|
+// String packageId = projectMessageDTO.getScenePackageId();
|
|
|
+// List<IndexTemplatePO> leafIndexList = indexTemplateMapper.selectLeafIndexByPackageId(packageId);
|
|
|
+// List<String> naturalIdList = new ArrayList<>();
|
|
|
+// List<String> standardIdList = new ArrayList<>();
|
|
|
+// List<String> accidentIdList = new ArrayList<>();
|
|
|
+// for (IndexTemplatePO indexTemplatePO : leafIndexList) {
|
|
|
+// String naturalIds = indexTemplatePO.getSceneNaturalIds();
|
|
|
+// String standardIds = indexTemplatePO.getSceneStatueIds();
|
|
|
+// String accidentIds = indexTemplatePO.getSceneTrafficIds();
|
|
|
+// if (StringUtil.isNotEmpty(naturalIds)) {
|
|
|
+// String[] naturalIdArray = naturalIds.split(",");
|
|
|
+// naturalIdList.addAll(Arrays.asList(naturalIdArray));
|
|
|
+// }
|
|
|
+// if (StringUtil.isNotEmpty(standardIds)) {
|
|
|
+// String[] standardArray = standardIds.split(",");
|
|
|
+// standardIdList.addAll(Arrays.asList(standardArray));
|
|
|
+// }
|
|
|
+// if (StringUtil.isNotEmpty(accidentIds)) {
|
|
|
+// String[] accidentIdArray = accidentIds.split(",");
|
|
|
+// accidentIdList.addAll(Arrays.asList(accidentIdArray));
|
|
|
+// }
|
|
|
+// }
|
|
|
+// List<ScenePO> sceneList = new ArrayList<>();
|
|
|
+// sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
|
|
|
+// sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
|
|
|
+// sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
|
|
|
+// projectMapper.updateTaskNumber(projectId, sceneList.size()); // 有多少场景就有多少任务
|
|
|
+// // -------------------------------- 2 模型 --------------------------------
|
|
|
+// // 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
|
+// String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
|
|
|
+// VehiclePO vehiclePO = vehicleMapper.selectByVehicleConfigId(vehicleConfigId); // 模型
|
|
|
+// List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleConfigId(vehicleConfigId); // 摄像头
|
|
|
+// List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleConfigId); // 完美传感器
|
|
|
+//
|
|
|
+// // -------------------------------- 3 任务消息 --------------------------------
|
|
|
+// // 根据场景创建任务,组装 task 消息
|
|
|
+// int maxSimulationTime = projectMessageDTO.getMaxSimulationTime();
|
|
|
+//
|
|
|
+// for (ScenePO scenePO : sceneList) {
|
|
|
+// String taskId = StringUtil.getRandomUUID();
|
|
|
+// String resultPath = linuxTempPath + projectId + "/" + taskId;
|
|
|
+// // 保存任务信息
|
|
|
+// TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
|
|
|
+// .id(taskId)
|
|
|
+// .pId(projectId)
|
|
|
+// .sceneId(scenePO.getId())
|
|
|
+// .sceneName(scenePO.getName())
|
|
|
+// .sceneType(scenePO.getType())
|
|
|
+// .runState(DictConstants.TASK_PENDING)
|
|
|
+// .runResult(resultPath)
|
|
|
+// .build();
|
|
|
+// taskPO.setCreateTime(TimeUtil.getNowForMysql());
|
|
|
+// taskPO.setCreateUserId(USER_ID);
|
|
|
+// taskPO.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
+// taskPO.setModifyUserId(USER_ID);
|
|
|
+// taskPO.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
+// taskPO.setIsDeleted("0");
|
|
|
+// taskMapper.insert(taskPO);
|
|
|
+// // 心跳信息存在緩存中
|
|
|
+// commonService.set(new RedisParameter(manualProjectTopic + ":" + projectId + ":" + taskId, TimeUtil.getNow() + "", 0));
|
|
|
+// // 组装 task 消息
|
|
|
+// TaskTO taskTO = TaskTO.builder()
|
|
|
+// .info(InfoTO.builder()
|
|
|
+// .project_id(projectId)
|
|
|
+// .task_id(taskId)
|
|
|
+// .task_path(resultPath)
|
|
|
+// .default_time(maxSimulationTime)
|
|
|
+// .build())
|
|
|
+// .scenario(ScenarioTO.builder()
|
|
|
+// .scenario_osc(scenePO.getScenarioOsc())
|
|
|
+// .scenario_odr(scenePO.getScenarioOdr())
|
|
|
+// .scenario_osgb(scenePO.getScenarioOsgb())
|
|
|
+// .build())
|
|
|
+// .vehicle(VehicleTO.builder()
|
|
|
+// .model(ModelTO.builder()
|
|
|
+// .model_label(vehiclePO.getModelLabel())
|
|
|
+// .build())
|
|
|
+// .dynamics(DynamicsTO.builder()
|
|
|
+// .dynamics_maxspeed(vehiclePO.getMaxSpeed())
|
|
|
+// .dynamics_enginepower(vehiclePO.getEnginePower())
|
|
|
+// .dynamics_maxdecel(vehiclePO.getMaxDeceleration())
|
|
|
+// .dynamics_maxsteering(vehiclePO.getMaxSteeringAngle())
|
|
|
+// .dynamics_mass(vehiclePO.getMass())
|
|
|
+// .dynamics_frontsurfaceeffective(vehiclePO.getFrontSurfaceEffective())
|
|
|
+// .dynamics_airdragcoefficient(vehiclePO.getAirDragCoefficient())
|
|
|
+// .dynamics_rollingresistance(vehiclePO.getRollingResistanceCoefficient())
|
|
|
+// .dynamics_wheeldiameter(vehiclePO.getWheelDiameter())
|
|
|
+// .dynamics_wheeldrive(vehiclePO.getWheelDrive())
|
|
|
+// .dynamics_overallefficiency(vehiclePO.getOverallEfficiency())
|
|
|
+// .dynamics_distfront(vehiclePO.getFrontDistance())
|
|
|
+// .dynamics_distrear(vehiclePO.getRearDistance())
|
|
|
+// .dynamics_distleft(vehiclePO.getLeftDistance())
|
|
|
+// .dynamics_distright(vehiclePO.getRightDistance())
|
|
|
+// .dynamics_distheight(vehiclePO.getHeightDistance())
|
|
|
+// .dynamics_wheelbase(vehiclePO.getWheelbase())
|
|
|
+// .build())
|
|
|
+// .sensors(SensorsTO.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
+// .camera(cameraPOList)
|
|
|
+// .OGT(ogtPOList)
|
|
|
+// .build())
|
|
|
+// .build())
|
|
|
+// .build();
|
|
|
+// //4-4 将对象转成 json
|
|
|
+// String taskJson = JsonUtil.beanToJson(taskTO);
|
|
|
+// //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
|
|
|
+// commonService.send(new KafkaParameter(projectId, taskJson));
|
|
|
+// }
|
|
|
+//
|
|
|
+// // -------------------------------- 4 算法(一期按单机版做) --------------------------------
|
|
|
+// // 私有仓库导入算法镜像(搭建私有仓库)
|
|
|
+// String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
|
+// //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
|
|
|
+// AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
|
|
|
+// String minioPath = algorithmPO.getMinioPath();
|
|
|
+// String dockerImage;
|
|
|
+// if ("0".equals(algorithmPO.getDockerImport())) {
|
|
|
+// dockerImage = "algorithm_" + algorithmId + ":latest";
|
|
|
+// String algorithmTarLinuxTempPath = linuxTempPath + minioPath;
|
|
|
+// // 下载算法文件到本地( 2 到仓库服务器)
|
|
|
+// Response response = commonService.download(new MinioParameter(minioPath));
|
|
|
+// InputStream inputStream = response.body().asInputStream();
|
|
|
+// FileUtil.writeInputStreamToLocalFile(inputStream, algorithmTarLinuxTempPath);
|
|
|
+// //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
|
|
|
+// LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
|
|
|
+// } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
|
|
|
+// dockerImage = algorithmPO.getDockerImage();
|
|
|
+// } else {
|
|
|
+// throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
|
|
|
+// }
|
|
|
+// // -------------------------------- 5 创建 pod 开始执行 --------------------------------
|
|
|
+// int completions = sceneList.size(); // 结束标
|
|
|
+// int parallelism = projectMessageDTO.getParallelism(); // 并行度
|
|
|
+// BatchV1Api batchV1Api = new BatchV1Api(apiClient);
|
|
|
+// V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
|
|
|
+// //1 apiVersion
|
|
|
+// //2 kind
|
|
|
+// //3 metadata
|
|
|
+// V1ObjectMeta metadata = yaml.getMetadata();
|
|
|
+// metadata.setName("project_" + projectId);
|
|
|
+// yaml.setMetadata(metadata);
|
|
|
+// //4 job
|
|
|
+// V1JobSpec job = yaml.getSpec();
|
|
|
+// job.setCompletions(completions);
|
|
|
+// job.setParallelism(parallelism);
|
|
|
+// //5 pod
|
|
|
+// V1PodSpec v1PodSpec = job.getTemplate().getSpec();
|
|
|
+// //6 container
|
|
|
+// List<V1Container> containers = v1PodSpec.getContainers();
|
|
|
+// for (V1Container container : containers) {
|
|
|
+// String name = container.getName();
|
|
|
+// if ("vtd".equals(name)) {
|
|
|
+// container.setName("vtd_" + projectId);
|
|
|
+// }
|
|
|
+// if ("algorithm".equals(name)) {
|
|
|
+// container.setName("algorithm_" + projectId);
|
|
|
+// container.setImage(dockerImage);
|
|
|
+// }
|
|
|
+// }
|
|
|
+// //4-4 创建
|
|
|
+// yaml.setSpec(job);
|
|
|
+// batchV1Api.createNamespacedJob("simulation-task", yaml, null, null, null);
|
|
|
+//
|
|
|
+// }
|
|
|
+
|
|
|
+
|
|
|
}
|