|
@@ -1,6 +1,7 @@
|
|
package com.css.simulation.resource.scheduler.consumer;
|
|
package com.css.simulation.resource.scheduler.consumer;
|
|
|
|
|
|
|
|
|
|
|
|
+import api.common.pojo.constants.DictConstants;
|
|
import api.common.pojo.dto.ProjectMessageDTO;
|
|
import api.common.pojo.dto.ProjectMessageDTO;
|
|
import api.common.util.JsonUtil;
|
|
import api.common.util.JsonUtil;
|
|
import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
|
|
import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
|
|
@@ -9,9 +10,9 @@ import com.css.simulation.resource.scheduler.mapper.TaskMapper;
|
|
import com.css.simulation.resource.scheduler.pojo.dto.task.*;
|
|
import com.css.simulation.resource.scheduler.pojo.dto.task.*;
|
|
import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
|
|
import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
-import org.springframework.boot.configurationprocessor.json.JSONException;
|
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
@@ -19,6 +20,7 @@ import org.springframework.stereotype.Component;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
|
|
@Component
|
|
@Component
|
|
|
|
+@Slf4j
|
|
public class ProjectConsumer {
|
|
public class ProjectConsumer {
|
|
|
|
|
|
@Autowired
|
|
@Autowired
|
|
@@ -35,32 +37,54 @@ public class ProjectConsumer {
|
|
System.out.println("------- 消费成功:" + projectRecord.value());
|
|
System.out.println("------- 消费成功:" + projectRecord.value());
|
|
}
|
|
}
|
|
|
|
|
|
- @KafkaListener(groupId = "simulation-resource-scheduler", topics = "project")
|
|
|
|
- public void parseProject(ConsumerRecord<String, String> projectRecord) throws JsonProcessingException, JSONException {
|
|
|
|
|
|
+ @KafkaListener(groupId = "simulation-resource-scheduler", topics = "manualProject")
|
|
|
|
+ public void parseProject(ConsumerRecord<String, String> projectRecord) throws JsonProcessingException {
|
|
//1 读取 kafka 的 project 信息
|
|
//1 读取 kafka 的 project 信息
|
|
String projectJson = projectRecord.value();
|
|
String projectJson = projectRecord.value();
|
|
|
|
|
|
ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
- String projectId = projectMessageDTO.getProjectId();
|
|
|
|
- String defaultTime = projectMessageDTO.getDefaultTime();
|
|
|
|
- String parallelism = projectMessageDTO.getParallelism();
|
|
|
|
|
|
+ String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
|
|
+ String taskMaxExecuteTime = projectMessageDTO.getTaskMaxExecuteTime(); // 任务最大执行时间
|
|
|
|
+ String parallelism = projectMessageDTO.getParallelism(); // 并行度
|
|
//2 修改该 project 的状态为执行中
|
|
//2 修改该 project 的状态为执行中
|
|
projectMapper.updateProjectState(projectId, "2");
|
|
projectMapper.updateProjectState(projectId, "2");
|
|
//3 根据 project 信息获取 task 信息。task 信息包括算法、模型、场景数据包等
|
|
//3 根据 project 信息获取 task 信息。task 信息包括算法、模型、场景数据包等
|
|
List<TaskPO> taskPOList = taskMapper.selectByProjectId(projectId);
|
|
List<TaskPO> taskPOList = taskMapper.selectByProjectId(projectId);
|
|
//4 组装 task 消息
|
|
//4 组装 task 消息
|
|
for (TaskPO taskPO : taskPOList) {
|
|
for (TaskPO taskPO : taskPOList) {
|
|
|
|
+ String resultPath = "/project/manual-project/" + projectId + "/" + taskPO.getId();
|
|
|
|
+ String scenarioOsc;
|
|
|
|
+ String scenarioOdr;
|
|
|
|
+ String scenarioOsgb;
|
|
|
|
+ if (DictConstants.SCENE_NATURAL.equals(taskPO.getSceneType())) {
|
|
|
|
+ scenarioOsc = taskPO.getSnScenarioOsc();
|
|
|
|
+ scenarioOdr = taskPO.getSnScenarioOsc();
|
|
|
|
+ scenarioOsgb = taskPO.getSnScenarioOsgb();
|
|
|
|
+ } else if (DictConstants.SCENE_STANDARD.equals(taskPO.getSceneType())) {
|
|
|
|
+ scenarioOsc = taskPO.getScrScenarioOsc();
|
|
|
|
+ scenarioOdr = taskPO.getScrScenarioOsc();
|
|
|
|
+ scenarioOsgb = taskPO.getScrScenarioOsgb();
|
|
|
|
+ } else if (DictConstants.SCENE_ACCIDENT.equals(taskPO.getSceneType())) {
|
|
|
|
+ scenarioOsc = taskPO.getSaScenarioOsc();
|
|
|
|
+ scenarioOdr = taskPO.getSaScenarioOsc();
|
|
|
|
+ scenarioOsgb = taskPO.getSaScenarioOsgb();
|
|
|
|
+ } else {
|
|
|
|
+ log.error("------- 不存在场景类型:" + taskPO.getSceneType());
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ String vehicleId = taskPO.getVehicleId();
|
|
|
|
+
|
|
//4-1 组装 task 消息
|
|
//4-1 组装 task 消息
|
|
TaskDTO taskDTO = TaskDTO.builder()
|
|
TaskDTO taskDTO = TaskDTO.builder()
|
|
.info(InfoDTO.builder()
|
|
.info(InfoDTO.builder()
|
|
.project_id(projectId)
|
|
.project_id(projectId)
|
|
.task_id(taskPO.getId())
|
|
.task_id(taskPO.getId())
|
|
- .task_path(taskPO.getResultPath())
|
|
|
|
|
|
+ .task_path(resultPath)
|
|
.build())
|
|
.build())
|
|
.scenario(ScenarioDTO.builder()
|
|
.scenario(ScenarioDTO.builder()
|
|
- .scenario_osc(taskPO.getScenarioOsc())
|
|
|
|
- .scenario_odr(taskPO.getScenarioOdr())
|
|
|
|
- .scenario_osgb(taskPO.getScenarioOsgb())
|
|
|
|
|
|
+ .scenario_osc(scenarioOsc)
|
|
|
|
+ .scenario_odr(scenarioOdr)
|
|
|
|
+ .scenario_osgb(scenarioOsgb)
|
|
.build())
|
|
.build())
|
|
.vehicle(VehicleDTO.builder()
|
|
.vehicle(VehicleDTO.builder()
|
|
.model(ModelDTO.builder()
|
|
.model(ModelDTO.builder()
|
|
@@ -76,14 +100,14 @@ public class ProjectConsumer {
|
|
.dynamics_airdragcoefficient(taskPO.getAirDragCoefficient())
|
|
.dynamics_airdragcoefficient(taskPO.getAirDragCoefficient())
|
|
.dynamics_rollingresistance(taskPO.getRollingResistanceCoefficient())
|
|
.dynamics_rollingresistance(taskPO.getRollingResistanceCoefficient())
|
|
.dynamics_wheeldiameter(taskPO.getWheelDiameter())
|
|
.dynamics_wheeldiameter(taskPO.getWheelDiameter())
|
|
- .dynamics_wheeldrive(taskPO.getDrive())
|
|
|
|
|
|
+ .dynamics_wheeldrive(taskPO.getWheelDrive())
|
|
.dynamics_overallefficiency(taskPO.getOverallEfficiency())
|
|
.dynamics_overallefficiency(taskPO.getOverallEfficiency())
|
|
.dynamics_distfront(taskPO.getFrontDistance())
|
|
.dynamics_distfront(taskPO.getFrontDistance())
|
|
.dynamics_distrear(taskPO.getRearDistance())
|
|
.dynamics_distrear(taskPO.getRearDistance())
|
|
.dynamics_distleft(taskPO.getLeftDistance())
|
|
.dynamics_distleft(taskPO.getLeftDistance())
|
|
.dynamics_distright(taskPO.getRightDistance())
|
|
.dynamics_distright(taskPO.getRightDistance())
|
|
- .dynamics_distheight(taskPO.getHeight())
|
|
|
|
- .dynamics_wheelbase(taskPO.getDynamicsWheelbase())
|
|
|
|
|
|
+ .dynamics_distheight(taskPO.getHeightDistance())
|
|
|
|
+ .dynamics_wheelbase(taskPO.getWheelbase())
|
|
.build())
|
|
.build())
|
|
.sensors(SensorsDTO.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
.sensors(SensorsDTO.builder() // 根据 vehicleId 查询绑定的传感器列表
|
|
.camera(sensorMapper.selectCameraByVehicleId(taskPO.getVehicleId()))
|
|
.camera(sensorMapper.selectCameraByVehicleId(taskPO.getVehicleId()))
|