|
@@ -1,95 +1,95 @@
|
|
-package com.css.simulation.resource.scheduler.consumer;
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-import api.common.util.JsonUtil;
|
|
|
|
-import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
|
|
|
|
-import com.css.simulation.resource.scheduler.mapper.SensorMapper;
|
|
|
|
-import com.css.simulation.resource.scheduler.mapper.TaskMapper;
|
|
|
|
-import com.css.simulation.resource.scheduler.pojo.dto.task.*;
|
|
|
|
-import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
|
|
|
|
-import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
|
-import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
|
-import org.springframework.boot.configurationprocessor.json.JSONException;
|
|
|
|
-import org.springframework.boot.configurationprocessor.json.JSONObject;
|
|
|
|
-import org.springframework.kafka.annotation.KafkaListener;
|
|
|
|
-import org.springframework.kafka.core.KafkaTemplate;
|
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
|
-
|
|
|
|
-import javax.annotation.Resource;
|
|
|
|
-import java.util.List;
|
|
|
|
-
|
|
|
|
-@Component
|
|
|
|
-public class ProjectConsumer {
|
|
|
|
-
|
|
|
|
- @Resource
|
|
|
|
- ProjectMapper projectMapper;
|
|
|
|
- @Resource
|
|
|
|
- TaskMapper taskMapper;
|
|
|
|
- @Resource
|
|
|
|
- SensorMapper sensorMapper;
|
|
|
|
- @Resource
|
|
|
|
- private KafkaTemplate<String, String> kafkaTemplate;
|
|
|
|
-
|
|
|
|
- @KafkaListener(topics = "project")
|
|
|
|
- public void parseProject(ConsumerRecord<String, String> projectRecord) throws JsonProcessingException, JSONException {
|
|
|
|
- //1 读取 kafka 的 project 信息
|
|
|
|
- String projectJson = projectRecord.value();
|
|
|
|
-
|
|
|
|
- JSONObject projectJsonObject = new JSONObject(projectJson);
|
|
|
|
- String projectId = projectJsonObject.optString("projectId"); // 项目 id
|
|
|
|
- String defaultTime = projectJsonObject.optString("defaultTime"); // 任务执行时间,一个工作里的所有任务公用
|
|
|
|
- String parallelism = projectJsonObject.optString("parallelism"); // 并行度,创建 pod 时使用
|
|
|
|
- //2 修改该 project 的状态为执行中
|
|
|
|
- projectMapper.updateProjectState(projectId, "2");
|
|
|
|
- //3 根据 project 信息获取 task 信息。task 信息包括算法、模型、场景数据包等
|
|
|
|
- List<TaskPO> taskPOList = taskMapper.selectByProjectId(projectId);
|
|
|
|
- //4 组装 task 消息
|
|
|
|
- for (TaskPO taskPO : taskPOList) {
|
|
|
|
- //4-1 组装 task 消息
|
|
|
|
- TaskDTO taskDTO = TaskDTO.builder()
|
|
|
|
- .info(InfoDTO.builder()
|
|
|
|
- .project_id(projectId)
|
|
|
|
- .task_id(taskPO.getId())
|
|
|
|
- .task_path(taskPO.getResultPath())
|
|
|
|
- .build())
|
|
|
|
- .scenario(ScenarioDTO.builder()
|
|
|
|
- .scenario_osc(taskPO.getScenarioOsc())
|
|
|
|
- .scenario_odr(taskPO.getScenarioOdr())
|
|
|
|
- .scenario_osgb(taskPO.getScenarioOsgb())
|
|
|
|
- .build())
|
|
|
|
- .vehicle(VehicleDTO.builder()
|
|
|
|
- .model(ModelDTO.builder().model_label(taskPO.getModelLabel()).build())
|
|
|
|
- .dynamics(DynamicsDTO.builder()
|
|
|
|
- .dynamics_maxspeed(taskPO.getMaxSpeed())
|
|
|
|
- .dynamics_enginepower(taskPO.getEnginePower())
|
|
|
|
- .dynamics_maxdecel(taskPO.getMaxDeceleration())
|
|
|
|
- .dynamics_maxsteering(taskPO.getMaxSteeringAngle())
|
|
|
|
- .dynamics_mass(taskPO.getMass())
|
|
|
|
- .dynamics_frontsurfaceeffective(taskPO.getFrontSurfaceEffective())
|
|
|
|
- .dynamics_airdragcoefficient(taskPO.getAirDragCoefficient())
|
|
|
|
- .dynamics_rollingresistance(taskPO.getRollingResistanceCoefficient())
|
|
|
|
- .dynamics_wheeldiameter(taskPO.getWheelDiameter())
|
|
|
|
- .dynamics_wheeldrive(taskPO.getDrive())
|
|
|
|
- .dynamics_overallefficiency(taskPO.getOverallEfficiency())
|
|
|
|
- .dynamics_distfront(taskPO.getFrontDistance())
|
|
|
|
- .dynamics_distrear(taskPO.getRearDistance())
|
|
|
|
- .dynamics_distleft(taskPO.getLeftDistance())
|
|
|
|
- .dynamics_distright(taskPO.getRightDistance())
|
|
|
|
- .dynamics_distheight(taskPO.getHeight())
|
|
|
|
- .dynamics_wheelbase(taskPO.getDynamicsWheelbase())
|
|
|
|
- .build())
|
|
|
|
- .sensors(SensorsDTO.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
|
- .camera(sensorMapper.selectCameraByVehicleId(taskPO.getVehicleId()))
|
|
|
|
- .OGT(sensorMapper.selectOgtByVehicleId(taskPO.getVehicleId()))
|
|
|
|
- .build())
|
|
|
|
- .build())
|
|
|
|
- .build();
|
|
|
|
- //4-4 将对象转成 json
|
|
|
|
- String taskJson = JsonUtil.beanToJson(taskDTO);
|
|
|
|
- //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
|
|
|
|
- kafkaTemplate.send(projectId, 0, taskPO.getId(), taskJson);
|
|
|
|
- }
|
|
|
|
- //4 创建 pod 开始执行。
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
|
|
+//package com.css.simulation.resource.scheduler.consumer;
|
|
|
|
+//
|
|
|
|
+//
|
|
|
|
+//import api.common.util.JsonUtil;
|
|
|
|
+//import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
|
|
|
|
+//import com.css.simulation.resource.scheduler.mapper.SensorMapper;
|
|
|
|
+//import com.css.simulation.resource.scheduler.mapper.TaskMapper;
|
|
|
|
+//import com.css.simulation.resource.scheduler.pojo.dto.task.*;
|
|
|
|
+//import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
|
|
|
|
+//import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
|
+//import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
|
+//import org.springframework.boot.configurationprocessor.json.JSONException;
|
|
|
|
+//import org.springframework.boot.configurationprocessor.json.JSONObject;
|
|
|
|
+//import org.springframework.kafka.annotation.KafkaListener;
|
|
|
|
+//import org.springframework.kafka.core.KafkaTemplate;
|
|
|
|
+//import org.springframework.stereotype.Component;
|
|
|
|
+//
|
|
|
|
+//import javax.annotation.Resource;
|
|
|
|
+//import java.util.List;
|
|
|
|
+//
|
|
|
|
+//@Component
|
|
|
|
+//public class ProjectConsumer {
|
|
|
|
+//
|
|
|
|
+// @Resource
|
|
|
|
+// ProjectMapper projectMapper;
|
|
|
|
+// @Resource
|
|
|
|
+// TaskMapper taskMapper;
|
|
|
|
+// @Resource
|
|
|
|
+// SensorMapper sensorMapper;
|
|
|
|
+// @Resource
|
|
|
|
+// private KafkaTemplate<String, String> kafkaTemplate;
|
|
|
|
+//
|
|
|
|
+// @KafkaListener(topics = "project")
|
|
|
|
+// public void parseProject(ConsumerRecord<String, String> projectRecord) throws JsonProcessingException, JSONException {
|
|
|
|
+// //1 读取 kafka 的 project 信息
|
|
|
|
+// String projectJson = projectRecord.value();
|
|
|
|
+//
|
|
|
|
+// JSONObject projectJsonObject = new JSONObject(projectJson);
|
|
|
|
+// String projectId = projectJsonObject.optString("projectId"); // 项目 id
|
|
|
|
+// String defaultTime = projectJsonObject.optString("defaultTime"); // 任务执行时间,一个工作里的所有任务公用
|
|
|
|
+// String parallelism = projectJsonObject.optString("parallelism"); // 并行度,创建 pod 时使用
|
|
|
|
+// //2 修改该 project 的状态为执行中
|
|
|
|
+// projectMapper.updateProjectState(projectId, "2");
|
|
|
|
+// //3 根据 project 信息获取 task 信息。task 信息包括算法、模型、场景数据包等
|
|
|
|
+// List<TaskPO> taskPOList = taskMapper.selectByProjectId(projectId);
|
|
|
|
+// //4 组装 task 消息
|
|
|
|
+// for (TaskPO taskPO : taskPOList) {
|
|
|
|
+// //4-1 组装 task 消息
|
|
|
|
+// TaskDTO taskDTO = TaskDTO.builder()
|
|
|
|
+// .info(InfoDTO.builder()
|
|
|
|
+// .project_id(projectId)
|
|
|
|
+// .task_id(taskPO.getId())
|
|
|
|
+// .task_path(taskPO.getResultPath())
|
|
|
|
+// .build())
|
|
|
|
+// .scenario(ScenarioDTO.builder()
|
|
|
|
+// .scenario_osc(taskPO.getScenarioOsc())
|
|
|
|
+// .scenario_odr(taskPO.getScenarioOdr())
|
|
|
|
+// .scenario_osgb(taskPO.getScenarioOsgb())
|
|
|
|
+// .build())
|
|
|
|
+// .vehicle(VehicleDTO.builder()
|
|
|
|
+// .model(ModelDTO.builder().model_label(taskPO.getModelLabel()).build())
|
|
|
|
+// .dynamics(DynamicsDTO.builder()
|
|
|
|
+// .dynamics_maxspeed(taskPO.getMaxSpeed())
|
|
|
|
+// .dynamics_enginepower(taskPO.getEnginePower())
|
|
|
|
+// .dynamics_maxdecel(taskPO.getMaxDeceleration())
|
|
|
|
+// .dynamics_maxsteering(taskPO.getMaxSteeringAngle())
|
|
|
|
+// .dynamics_mass(taskPO.getMass())
|
|
|
|
+// .dynamics_frontsurfaceeffective(taskPO.getFrontSurfaceEffective())
|
|
|
|
+// .dynamics_airdragcoefficient(taskPO.getAirDragCoefficient())
|
|
|
|
+// .dynamics_rollingresistance(taskPO.getRollingResistanceCoefficient())
|
|
|
|
+// .dynamics_wheeldiameter(taskPO.getWheelDiameter())
|
|
|
|
+// .dynamics_wheeldrive(taskPO.getDrive())
|
|
|
|
+// .dynamics_overallefficiency(taskPO.getOverallEfficiency())
|
|
|
|
+// .dynamics_distfront(taskPO.getFrontDistance())
|
|
|
|
+// .dynamics_distrear(taskPO.getRearDistance())
|
|
|
|
+// .dynamics_distleft(taskPO.getLeftDistance())
|
|
|
|
+// .dynamics_distright(taskPO.getRightDistance())
|
|
|
|
+// .dynamics_distheight(taskPO.getHeight())
|
|
|
|
+// .dynamics_wheelbase(taskPO.getDynamicsWheelbase())
|
|
|
|
+// .build())
|
|
|
|
+// .sensors(SensorsDTO.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
|
+// .camera(sensorMapper.selectCameraByVehicleId(taskPO.getVehicleId()))
|
|
|
|
+// .OGT(sensorMapper.selectOgtByVehicleId(taskPO.getVehicleId()))
|
|
|
|
+// .build())
|
|
|
|
+// .build())
|
|
|
|
+// .build();
|
|
|
|
+// //4-4 将对象转成 json
|
|
|
|
+// String taskJson = JsonUtil.beanToJson(taskDTO);
|
|
|
|
+// //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
|
|
|
|
+// kafkaTemplate.send(projectId, 0, taskPO.getId(), taskJson);
|
|
|
|
+// }
|
|
|
|
+// //4 创建 pod 开始执行。
|
|
|
|
+//
|
|
|
|
+// }
|
|
|
|
+//}
|