|
@@ -265,8 +265,7 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
|
|
|
simulationProjectMapper.updateProjectNowRunState(param);
|
|
|
} else {
|
|
|
//3 校验项目的信息是否可用
|
|
|
- projectUtil.checkProject(DictConstants.PROJECT_TYPE_MANUAL, po.getId(), po.getAlgorithm(), po.getVehicle(),
|
|
|
- po.getScene());
|
|
|
+ projectUtil.checkProject(po.getAlgorithm(), po.getVehicle(), po.getScene());
|
|
|
// 已经完成的项目再次运行
|
|
|
if (DictConstants.PROJECT_COMPLETED.equals(po.getNowRunState())) {
|
|
|
po.createPo(AuthUtil.getCurrentUserId());
|
|
@@ -314,7 +313,7 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
|
|
|
|
|
|
private void projectRunToKafka(SimulationManualProjectPo po) {
|
|
|
try {
|
|
|
- log.info("projectRunToKafka() 准备发送项目消息:" + po);
|
|
|
+ log.info("准备发送项目消息:" + po);
|
|
|
SimulationManualProjectKafkaParam kafkaParam = new SimulationManualProjectKafkaParam();
|
|
|
kafkaParam.setProjectId(po.getId());
|
|
|
kafkaParam.setAlgorithmId(po.getAlgorithm());
|
|
@@ -328,7 +327,7 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
|
|
|
kafkaParameter.setTopic(ProjectConstants.RUN_TASK_TOPIC);
|
|
|
String data = JsonUtil.beanToJson(kafkaParam);
|
|
|
kafkaParameter.setData(data);
|
|
|
- log.info("projectRunToKafka() 推送项目运行消息到kafka:" + data);
|
|
|
+ log.info("推送项目运行消息到kafka:" + data);
|
|
|
kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
|
|
|
// 消息发送到的topic
|
|
|
String topic = success.getRecordMetadata().topic();
|
|
@@ -336,10 +335,8 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
|
|
|
int partition = success.getRecordMetadata().partition();
|
|
|
// 消息在分区内的offset
|
|
|
long offset = success.getRecordMetadata().offset();
|
|
|
- log.info("------- 发送消息成功,主题 topic 为:" + topic + ",分区 partition 为:" + partition + ",偏移量为:" + offset + ",消息体为:" + kafkaParameter.getData());
|
|
|
- }, failure -> {
|
|
|
- log.error("发送消息失败:" + failure.getMessage());
|
|
|
- });
|
|
|
+ log.info("发送消息成功,主题 topic 为:" + topic + ",分区 partition 为:" + partition + ",偏移量为:" + offset + ",消息体为:" + kafkaParameter.getData());
|
|
|
+ }, failure -> log.error("发送消息失败:" + failure.getMessage()));
|
|
|
} catch (NumberFormatException e) {
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
@@ -354,7 +351,6 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
|
|
|
String data = JsonUtil.beanToJson(kafkaParam);
|
|
|
kafkaParameter.setData(data);
|
|
|
log.info("推送项目中止消息到kafka:" + data);
|
|
|
-// kafkaService.send(kafkaParameter);
|
|
|
kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
|
|
|
// 消息发送到的topic
|
|
|
String topic = success.getRecordMetadata().topic();
|
|
@@ -362,15 +358,9 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
|
|
|
int partition = success.getRecordMetadata().partition();
|
|
|
// 消息在分区内的offset
|
|
|
long offset = success.getRecordMetadata().offset();
|
|
|
- log.info("------- 发送消息成功:\n"
|
|
|
- + "主题 topic 为:" + topic + "\n"
|
|
|
- + "分区 partition 为:" + partition + "\n"
|
|
|
- + "偏移量为:" + offset + "\n"
|
|
|
- + "消息体为:" + kafkaParameter.getData());
|
|
|
- }, failure -> {
|
|
|
- log.error("发送消息失败:" + failure.getMessage());
|
|
|
- });
|
|
|
-
|
|
|
+ log.info("------- 发送消息成功:主题 topic 为:" + topic + "分区 partition 为:" + partition + "偏移量为:"
|
|
|
+ + offset + "消息体为:" + kafkaParameter.getData());
|
|
|
+ }, failure -> log.error("发送消息失败:" + failure.getMessage()));
|
|
|
}
|
|
|
|
|
|
private void autoProjectStopToKafka(SimulationAutomaticSubProjectPo po) {
|