|
@@ -310,51 +310,43 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
|
|
final String newId = StringUtil.getRandomUUID();
|
|
final String newId = StringUtil.getRandomUUID();
|
|
simulationManualProjectPO.setNewId(newId);
|
|
simulationManualProjectPO.setNewId(newId);
|
|
po.setId(newId);
|
|
po.setId(newId);
|
|
|
|
+ param.setId(newId);
|
|
simulationProjectMapper.updateIdById(simulationManualProjectPO); //此处注意事务
|
|
simulationProjectMapper.updateIdById(simulationManualProjectPO); //此处注意事务
|
|
}
|
|
}
|
|
- // 设置开始时间
|
|
|
|
param.setStartTime(new Date());
|
|
param.setStartTime(new Date());
|
|
- // Kafka推送消息
|
|
|
|
|
|
+ simulationProjectMapper.updateProjectNowRunState(param);
|
|
projectRunToKafka(po);
|
|
projectRunToKafka(po);
|
|
- } else if (DictConstants.PROJECT_COMPLETED.equals(param.getNowRunState())) {
|
|
|
|
- // 设置完成时间
|
|
|
|
- param.setFinishTime(new Date());
|
|
|
|
}
|
|
}
|
|
- simulationProjectMapper.updateProjectNowRunState(param);
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return new ResponseBodyVO<>(ResponseBodyVO.Response.SUCCESS);
|
|
return new ResponseBodyVO<>(ResponseBodyVO.Response.SUCCESS);
|
|
}
|
|
}
|
|
|
|
|
|
private void projectRunToKafka(SimulationManualProjectPO po) {
|
|
private void projectRunToKafka(SimulationManualProjectPO po) {
|
|
- try {
|
|
|
|
- log.info("准备发送项目消息:" + po);
|
|
|
|
- SimulationManualProjectKafkaParam kafkaParam = new SimulationManualProjectKafkaParam();
|
|
|
|
- kafkaParam.setProjectId(po.getId());
|
|
|
|
- kafkaParam.setAlgorithmId(po.getAlgorithm());
|
|
|
|
- kafkaParam.setVehicleConfigId(po.getVehicle());
|
|
|
|
- kafkaParam.setScenePackageId(po.getScene());
|
|
|
|
- kafkaParam.setMaxSimulationTime(po.getMaxSimulationTime());
|
|
|
|
- kafkaParam.setParallelism(Integer.valueOf(po.getParallelism()));
|
|
|
|
- kafkaParam.setType(DictConstants.PROJECT_TYPE_MANUAL);
|
|
|
|
- kafkaParam.setModelType(vehicleMapper.selectParameterTypeById(po.getVehicle()));
|
|
|
|
- KafkaParameter kafkaParameter = new KafkaParameter();
|
|
|
|
- kafkaParameter.setTopic(ProjectConstants.RUN_TASK_TOPIC);
|
|
|
|
- String data = JsonUtil.beanToJson(kafkaParam);
|
|
|
|
- kafkaParameter.setData(data);
|
|
|
|
- log.info("推送项目运行消息到kafka:" + data);
|
|
|
|
- kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
|
|
|
|
- // 消息发送到的topic
|
|
|
|
- String topic = success.getRecordMetadata().topic();
|
|
|
|
- // 消息发送到的分区
|
|
|
|
- int partition = success.getRecordMetadata().partition();
|
|
|
|
- // 消息在分区内的offset
|
|
|
|
- long offset = success.getRecordMetadata().offset();
|
|
|
|
- log.info("发送消息成功,主题 topic 为:" + topic + ",分区 partition 为:" + partition + ",偏移量为:" + offset + ",消息体为:" + kafkaParameter.getData());
|
|
|
|
- }, failure -> log.error("发送消息失败:" + failure.getMessage()));
|
|
|
|
- } catch (NumberFormatException e) {
|
|
|
|
- throw new RuntimeException(e);
|
|
|
|
- }
|
|
|
|
|
|
+ log.info("准备发送项目消息:" + po);
|
|
|
|
+ SimulationManualProjectKafkaParam kafkaParam = new SimulationManualProjectKafkaParam();
|
|
|
|
+ kafkaParam.setProjectId(po.getId());
|
|
|
|
+ kafkaParam.setAlgorithmId(po.getAlgorithm());
|
|
|
|
+ kafkaParam.setVehicleConfigId(po.getVehicle());
|
|
|
|
+ kafkaParam.setScenePackageId(po.getScene());
|
|
|
|
+ kafkaParam.setMaxSimulationTime(po.getMaxSimulationTime());
|
|
|
|
+ kafkaParam.setParallelism(Integer.valueOf(po.getParallelism()));
|
|
|
|
+ kafkaParam.setType(DictConstants.PROJECT_TYPE_MANUAL);
|
|
|
|
+ kafkaParam.setModelType(vehicleMapper.selectParameterTypeById(po.getVehicle()));
|
|
|
|
+ KafkaParameter kafkaParameter = new KafkaParameter();
|
|
|
|
+ kafkaParameter.setTopic(ProjectConstants.RUN_TASK_TOPIC);
|
|
|
|
+ String data = JsonUtil.beanToJson(kafkaParam);
|
|
|
|
+ kafkaParameter.setData(data);
|
|
|
|
+ log.info("推送项目运行消息到 Kafka:" + data);
|
|
|
|
+ kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
|
|
|
|
+ // 消息发送到的topic
|
|
|
|
+ String topic = success.getRecordMetadata().topic();
|
|
|
|
+ // 消息发送到的分区
|
|
|
|
+ int partition = success.getRecordMetadata().partition();
|
|
|
|
+ // 消息在分区内的offset
|
|
|
|
+ long offset = success.getRecordMetadata().offset();
|
|
|
|
+ log.info("发送消息成功,主题 topic 为:" + topic + ",分区 partition 为:" + partition + ",偏移量为:" + offset + ",消息体为:" + kafkaParameter.getData());
|
|
|
|
+ }, failure -> log.error("发送消息失败:" + failure.getMessage()));
|
|
}
|
|
}
|
|
|
|
|
|
private void projectStopToKafka(SimulationManualProjectPO po) {
|
|
private void projectStopToKafka(SimulationManualProjectPO po) {
|
|
@@ -398,6 +390,12 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
|
|
}, failure -> log.error("发送消息失败:" + failure.getMessage()));
|
|
}, failure -> log.error("发送消息失败:" + failure.getMessage()));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 查询项目详情,不使用详情字段中的某些字段,例如项目运行状态,并查询任务列表
|
|
|
|
+ *
|
|
|
|
+ * @param param
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
@Override
|
|
@Override
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
public ResponseBodyVO<ProjectDetailsVo> selectProjectDetailsById(SimulationManualProjectParam param) {
|
|
public ResponseBodyVO<ProjectDetailsVo> selectProjectDetailsById(SimulationManualProjectParam param) {
|