|
@@ -0,0 +1,227 @@
|
|
|
+package com.css.simulation.resource.scheduler.consumer;
|
|
|
+
|
|
|
+
|
|
|
+import api.common.pojo.constants.DictConstants;
|
|
|
+import api.common.pojo.dto.ProjectMessageDTO;
|
|
|
+import api.common.pojo.param.KafkaParameter;
|
|
|
+import api.common.pojo.param.MinioParameter;
|
|
|
+import api.common.util.FileUtil;
|
|
|
+import api.common.util.JsonUtil;
|
|
|
+import api.common.util.LinuxUtil;
|
|
|
+import api.common.util.StringUtil;
|
|
|
+import com.css.simulation.resource.scheduler.feign.CommonService;
|
|
|
+import com.css.simulation.resource.scheduler.mapper.*;
|
|
|
+import com.css.simulation.resource.scheduler.pojo.dto.*;
|
|
|
+import com.css.simulation.resource.scheduler.pojo.po.*;
|
|
|
+import feign.Response;
|
|
|
+import io.kubernetes.client.openapi.ApiClient;
|
|
|
+import io.kubernetes.client.openapi.ApiException;
|
|
|
+import io.kubernetes.client.openapi.apis.BatchV1Api;
|
|
|
+import io.kubernetes.client.openapi.models.*;
|
|
|
+import io.kubernetes.client.util.Yaml;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.kafka.annotation.KafkaListener;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.ResourceUtils;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+public class ManualProjectConsumer {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ ProjectMapper projectMapper;
|
|
|
+ @Autowired
|
|
|
+ IndexMapper indexMapper;
|
|
|
+ @Autowired
|
|
|
+ SceneMapper sceneMapper;
|
|
|
+ @Autowired
|
|
|
+ VehicleMapper vehicleMapper;
|
|
|
+ @Autowired
|
|
|
+ TaskMapper taskMapper;
|
|
|
+ @Autowired
|
|
|
+ SensorCameraMapper sensorCameraMapper;
|
|
|
+ @Autowired
|
|
|
+ SensorOgtMapper sensorOgtMapper;
|
|
|
+ @Autowired
|
|
|
+ private CommonService commonService;
|
|
|
+ @Autowired
|
|
|
+ private AlgorithmMapper algorithmMapper;
|
|
|
+ @Autowired
|
|
|
+ private ApiClient apiClient;
|
|
|
+
|
|
|
+ @KafkaListener(groupId = "simulation-resource-scheduler", topics = "test")
|
|
|
+ public void testConsumer(ConsumerRecord<String, String> projectRecord) {
|
|
|
+ System.out.println("------- 消费成功:" + projectRecord.value());
|
|
|
+ }
|
|
|
+
|
|
|
+ @KafkaListener(groupId = "simulation-resource-scheduler", topics = "manualProject")
|
|
|
+ public void parseProject(ConsumerRecord<String, String> projectRecord) throws IOException, ApiException {
|
|
|
+ //1 读取 kafka 的 project 信息
|
|
|
+ String projectJson = projectRecord.value();
|
|
|
+ ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
|
+ String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
|
+ projectMapper.updateProjectState(projectId, "2"); // 修改该 project 的状态为执行中
|
|
|
+
|
|
|
+
|
|
|
+ // -------------------------------- 1 场景 --------------------------------
|
|
|
+ // 根据 packageId,拿到场景 id,然后获取每个场景的文件地址。
|
|
|
+ String packageId = projectMessageDTO.getScenePackageId();
|
|
|
+ List<IndexPO> leafIndexList = indexMapper.selectLeafIndexByPackageId(packageId);
|
|
|
+ List<String> naturalIdList = new ArrayList<>();
|
|
|
+ List<String> standardIdList = new ArrayList<>();
|
|
|
+ List<String> accidentIdList = new ArrayList<>();
|
|
|
+ for (IndexPO indexPO : leafIndexList) {
|
|
|
+ String naturalIds = indexPO.getSceneNaturalIds();
|
|
|
+ String standardIds = indexPO.getSceneStatueIds();
|
|
|
+ String accidentIds = indexPO.getSceneTrafficIds();
|
|
|
+ if (StringUtil.isNotEmpty(naturalIds)) {
|
|
|
+ String[] naturalIdArray = naturalIds.split(",");
|
|
|
+ naturalIdList.addAll(Arrays.asList(naturalIdArray));
|
|
|
+ }
|
|
|
+ if (StringUtil.isNotEmpty(standardIds)) {
|
|
|
+ String[] standardArray = standardIds.split(",");
|
|
|
+ standardIdList.addAll(Arrays.asList(standardArray));
|
|
|
+ }
|
|
|
+ if (StringUtil.isNotEmpty(accidentIds)) {
|
|
|
+ String[] accidentIdArray = accidentIds.split(",");
|
|
|
+ accidentIdList.addAll(Arrays.asList(accidentIdArray));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ List<ScenePO> sceneList = new ArrayList<>();
|
|
|
+ sceneList.addAll(sceneMapper.selectNaturalByIdList(naturalIdList));
|
|
|
+ sceneList.addAll(sceneMapper.selectStandardByIdList(standardIdList));
|
|
|
+ sceneList.addAll(sceneMapper.selectAccidentByIdList(accidentIdList));
|
|
|
+
|
|
|
+ // -------------------------------- 2 模型 --------------------------------
|
|
|
+ // 根据 vehicleId, 获取 模型信息和传感器信息
|
|
|
+ String vehicleId = projectMessageDTO.getVehicleId(); // 模型 id
|
|
|
+ VehiclePO vehiclePO = vehicleMapper.selectByVehicleId(projectId);
|
|
|
+ List<CameraPO> cameraPOList = sensorCameraMapper.selectCameraByVehicleId(vehicleId);
|
|
|
+ List<OgtPO> ogtPOList = sensorOgtMapper.selectOgtByVehicleId(vehicleId);
|
|
|
+
|
|
|
+ // -------------------------------- 3 任务消息 --------------------------------
|
|
|
+ // 根据场景创建任务,组装 task 消息
|
|
|
+ int maxSimulationTime = projectMessageDTO.getMaxSimulationTime();
|
|
|
+ for (ScenePO scenePO : sceneList) {
|
|
|
+ String taskId = StringUtil.getRandomUUID();
|
|
|
+ String resultPath = "/project/manual-project/" + projectId + "/" + taskId;
|
|
|
+ TaskPO taskPO = TaskPO.builder()
|
|
|
+ .id(taskId)
|
|
|
+ .pId(projectId)
|
|
|
+ .sceneId(scenePO.getId())
|
|
|
+ .sceneName(scenePO.getName())
|
|
|
+ .sceneType(scenePO.getType())
|
|
|
+ .runState(DictConstants.TASK_WAITING)
|
|
|
+ .runResult(resultPath)
|
|
|
+ .build();
|
|
|
+ //4-1 组装 task 消息
|
|
|
+ TaskDTO taskDTO = TaskDTO.builder()
|
|
|
+ .info(InfoDTO.builder()
|
|
|
+ .project_id(projectId)
|
|
|
+ .task_id(taskId)
|
|
|
+ .task_path(resultPath)
|
|
|
+ .default_time(maxSimulationTime)
|
|
|
+ .build())
|
|
|
+ .scenario(ScenarioDTO.builder()
|
|
|
+ .scenario_osc(scenePO.getScenarioOsc())
|
|
|
+ .scenario_odr(scenePO.getScenarioOdr())
|
|
|
+ .scenario_osgb(scenePO.getScenarioOsgb())
|
|
|
+ .build())
|
|
|
+ .vehicle(VehicleDTO.builder()
|
|
|
+ .model(ModelDTO.builder()
|
|
|
+ .model_label(vehiclePO.getModelLabel())
|
|
|
+ .build())
|
|
|
+ .dynamics(DynamicsDTO.builder()
|
|
|
+ .dynamics_maxspeed(vehiclePO.getMaxSpeed())
|
|
|
+ .dynamics_enginepower(vehiclePO.getEnginePower())
|
|
|
+ .dynamics_maxdecel(vehiclePO.getMaxDeceleration())
|
|
|
+ .dynamics_maxsteering(vehiclePO.getMaxSteeringAngle())
|
|
|
+ .dynamics_mass(vehiclePO.getMass())
|
|
|
+ .dynamics_frontsurfaceeffective(vehiclePO.getFrontSurfaceEffective())
|
|
|
+ .dynamics_airdragcoefficient(vehiclePO.getAirDragCoefficient())
|
|
|
+ .dynamics_rollingresistance(vehiclePO.getRollingResistanceCoefficient())
|
|
|
+ .dynamics_wheeldiameter(vehiclePO.getWheelDiameter())
|
|
|
+ .dynamics_wheeldrive(vehiclePO.getWheelDrive())
|
|
|
+ .dynamics_overallefficiency(vehiclePO.getOverallEfficiency())
|
|
|
+ .dynamics_distfront(vehiclePO.getFrontDistance())
|
|
|
+ .dynamics_distrear(vehiclePO.getRearDistance())
|
|
|
+ .dynamics_distleft(vehiclePO.getLeftDistance())
|
|
|
+ .dynamics_distright(vehiclePO.getRightDistance())
|
|
|
+ .dynamics_distheight(vehiclePO.getHeightDistance())
|
|
|
+ .dynamics_wheelbase(vehiclePO.getWheelbase())
|
|
|
+ .build())
|
|
|
+ .sensors(SensorsDTO.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
+ .camera(cameraPOList)
|
|
|
+ .OGT(ogtPOList)
|
|
|
+ .build())
|
|
|
+ .build())
|
|
|
+ .build();
|
|
|
+ //4-4 将对象转成 json
|
|
|
+ String taskJson = JsonUtil.beanToJson(taskDTO);
|
|
|
+ //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
|
|
|
+ commonService.send(new KafkaParameter(projectId, taskJson));
|
|
|
+ }
|
|
|
+
|
|
|
+ // -------------------------------- 4 算法(一期按单机版做) --------------------------------
|
|
|
+ // 私有仓库导入算法镜像(搭建私有仓库)
|
|
|
+ String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
|
+ String localPath = "/opt/module/algorithm/";
|
|
|
+ //4-1 根据算法 id 获取算法文件地址
|
|
|
+ String minioPath = algorithmMapper.selectMinioPathById(algorithmId);
|
|
|
+ // 下载算法文件到本地( 2 到仓库服务器)
|
|
|
+ Response response = commonService.download(new MinioParameter(minioPath));
|
|
|
+ InputStream inputStream = response.body().asInputStream();
|
|
|
+ FileUtil.writeInputStreamToLocalFile(inputStream,localPath);
|
|
|
+ //4-2 本地执行 docker load 算法文件成镜像( 2 创建 ssh 连接)
|
|
|
+ LinuxUtil.execute("docker load");
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ // -------------------------------- 5 创建 pod 开始执行 --------------------------------
|
|
|
+ int completions = sceneList.size();
|
|
|
+ int parallelism = projectMessageDTO.getParallelism(); // 并行度
|
|
|
+ BatchV1Api batchV1Api = new BatchV1Api(apiClient);
|
|
|
+ V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
|
|
|
+ //1 apiVersion
|
|
|
+ //2 kind
|
|
|
+ //3 metadata
|
|
|
+ V1ObjectMeta metadata = yaml.getMetadata();
|
|
|
+ metadata.setName("job2");
|
|
|
+ yaml.setMetadata(metadata);
|
|
|
+
|
|
|
+ //4 spec
|
|
|
+ V1JobSpec v1JobSpec = yaml.getSpec();
|
|
|
+ //4-1 job
|
|
|
+ v1JobSpec.setCompletions(completions);
|
|
|
+ v1JobSpec.setParallelism(parallelism);
|
|
|
+ //4-2 pod
|
|
|
+ V1PodTemplateSpec v1PodTemplateSpec = v1JobSpec.getTemplate();
|
|
|
+
|
|
|
+ v1PodTemplateSpec.setMetadata(metadata);
|
|
|
+ V1PodSpec v1PodSpec = v1PodTemplateSpec.getSpec();
|
|
|
+ v1PodSpec.setRestartPolicy("Never");
|
|
|
+ //4-3 container
|
|
|
+ List<V1Container> containers = new ArrayList<>();
|
|
|
+ V1Container v1Container = v1PodSpec.getContainers().get(0);
|
|
|
+ v1Container.setImage("perl");
|
|
|
+ v1Container.setImagePullPolicy("IfNotPresent");
|
|
|
+ v1Container.setName("pi");
|
|
|
+ v1Container.setCommand(Arrays.asList("perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"));
|
|
|
+ containers.add(v1Container);
|
|
|
+ //4-4 创建
|
|
|
+ v1PodSpec.setContainers(containers);
|
|
|
+ v1PodTemplateSpec.setSpec(v1PodSpec);
|
|
|
+ v1JobSpec.setTemplate(v1PodTemplateSpec);
|
|
|
+ yaml.setSpec(v1JobSpec);
|
|
|
+ batchV1Api.createNamespacedJob("test-namespace1", yaml, null, null, null);
|
|
|
+
|
|
|
+ }
|
|
|
+}
|