|
@@ -0,0 +1,93 @@
|
|
|
|
+package com.css.simulation.resource.scheduler.consumer;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+import api.common.util.JsonUtil;
|
|
|
|
+import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
|
|
|
|
+import com.css.simulation.resource.scheduler.mapper.SensorMapper;
|
|
|
|
+import com.css.simulation.resource.scheduler.mapper.TaskMapper;
|
|
|
|
+import com.css.simulation.resource.scheduler.pojo.dto.task.*;
|
|
|
|
+import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
|
|
|
|
+import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
|
+import org.springframework.boot.configurationprocessor.json.JSONException;
|
|
|
|
+import org.springframework.boot.configurationprocessor.json.JSONObject;
|
|
|
|
+import org.springframework.kafka.annotation.KafkaListener;
|
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
+
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
+import java.util.List;
|
|
|
|
+
|
|
|
|
+@Component
|
|
|
|
+public class ProjectConsumer {
|
|
|
|
+
|
|
|
|
+ @Resource
|
|
|
|
+ ProjectMapper projectMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ TaskMapper taskMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ SensorMapper sensorMapper;
|
|
|
|
+ @Resource
|
|
|
|
+ private KafkaTemplate<String, String> kafkaTemplate;
|
|
|
|
+
|
|
|
|
+ @KafkaListener(topics = "project")
|
|
|
|
+ public void parseProject(ConsumerRecord<String, String> projectRecord) throws JsonProcessingException, JSONException {
|
|
|
|
+ //1 读取 kafka 的 project 信息
|
|
|
|
+ String projectJson = projectRecord.value();
|
|
|
|
+ JSONObject projectJsonObject = JsonUtil.jsonToJSONObject(projectJson);
|
|
|
|
+ String projectId = projectJsonObject.optString("projectId"); // 项目 id
|
|
|
|
+ String defaultTime = projectJsonObject.optString("defaultTime"); // 任务执行时间,一个工作里的所有任务公用
|
|
|
|
+ String parallelism = projectJsonObject.optString("parallelism"); // 并行度,创建 pod 时使用
|
|
|
|
+ //2 修改该 project 的状态为执行中
|
|
|
|
+ projectMapper.updateProjectState(projectId, "2");
|
|
|
|
+ //3 根据 project 信息获取 task 信息。task 信息包括算法、模型、场景数据包等
|
|
|
|
+ List<TaskPO> taskPOList = taskMapper.selectByProjectId(projectId);
|
|
|
|
+ //4 组装 task 消息
|
|
|
|
+ for (TaskPO taskPO : taskPOList) {
|
|
|
|
+ //4-1 组装 task 消息
|
|
|
|
+ TaskDTO taskDTO = TaskDTO.builder()
|
|
|
|
+ .info(InfoDTO.builder()
|
|
|
|
+ .project_id(projectId)
|
|
|
|
+ .task_id(taskPO.getId())
|
|
|
|
+ .task_path(taskPO.getResultPath())
|
|
|
|
+ .build())
|
|
|
|
+ .scenario(ScenarioDTO.builder()
|
|
|
|
+ .scenario_osc(taskPO.getScenarioOsc())
|
|
|
|
+ .scenario_odr(taskPO.getScenarioOdr())
|
|
|
|
+ .scenario_osgb(taskPO.getScenarioOsgb())
|
|
|
|
+ .build())
|
|
|
|
+ .vehicle(VehicleDTO.builder()
|
|
|
|
+ .model(ModelDTO.builder().model_label(taskPO.getModelLabel()).build())
|
|
|
|
+ .dynamics(DynamicsDTO.builder()
|
|
|
|
+ .dynamics_maxspeed(taskPO.getMaxSpeed())
|
|
|
|
+ .dynamics_enginepower(taskPO.getEnginePower())
|
|
|
|
+ .dynamics_maxdecel(taskPO.getMaxDeceleration())
|
|
|
|
+ .dynamics_maxsteering(taskPO.getMaxSteeringAngle())
|
|
|
|
+ .dynamics_mass(taskPO.getMass())
|
|
|
|
+ .dynamics_frontsurfaceeffective(taskPO.getFrontSurfaceEffective())
|
|
|
|
+ .dynamics_airdragcoefficient(taskPO.getAirDragCoefficient())
|
|
|
|
+ .dynamics_rollingresistance(taskPO.getRollingResistanceCoefficient())
|
|
|
|
+ .dynamics_wheeldiameter(taskPO.getWheelDiameter())
|
|
|
|
+ .dynamics_wheeldrive(taskPO.getDrive())
|
|
|
|
+ .dynamics_overallefficiency(taskPO.getOverallEfficiency())
|
|
|
|
+ .dynamics_distfront(taskPO.getFrontDistance())
|
|
|
|
+ .dynamics_distrear(taskPO.getRearDistance())
|
|
|
|
+ .dynamics_distleft(taskPO.getLeftDistance())
|
|
|
|
+ .dynamics_distright(taskPO.getRightDistance())
|
|
|
|
+ .dynamics_distheight(taskPO.getHeight())
|
|
|
|
+ .dynamics_wheelbase(taskPO.getDynamicsWheelbase())
|
|
|
|
+ .build())
|
|
|
|
+ .sensors(SensorsDTO.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
|
|
+ .camera(sensorMapper.selectCameraByVehicleId(taskPO.getVehicleId()))
|
|
|
|
+ .OGT(sensorMapper.selectOgtByVehicleId(taskPO.getVehicleId()))
|
|
|
|
+ .build())
|
|
|
|
+ .build())
|
|
|
|
+ .build();
|
|
|
|
+ //4-4 将对象转成 json
|
|
|
|
+ String taskJson = JsonUtil.beanToJson(taskDTO);
|
|
|
|
+ //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
|
|
|
|
+ kafkaTemplate.send(projectId, 0, taskPO.getId(), taskJson);
|
|
|
|
+ }
|
|
|
|
+ //4 创建 pod 开始执行。
|
|
|
|
+ }
|
|
|
|
+}
|