|
@@ -87,7 +87,10 @@ public class TickScheduler {
|
|
assert retry != null;
|
|
assert retry != null;
|
|
int retryNumber = Integer.parseInt(retry);
|
|
int retryNumber = Integer.parseInt(retry);
|
|
if (retryNumber < 3) {
|
|
if (retryNumber < 3) {
|
|
|
|
+ log.info("------- TickScheduler--retry 准备第" + retryNumber + "次重试!");
|
|
String taskJson = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":message");
|
|
String taskJson = redisTemplate.opsForValue().get(manualProjectTopic + ":" + projectId + ":" + taskId + ":message");
|
|
|
|
+ int finalRetryNumber = retryNumber + 1;
|
|
|
|
+ redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":" + taskId + ":retry", finalRetryNumber + "");
|
|
kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
|
|
kafkaTemplate.send(projectId, taskJson).addCallback(success -> {
|
|
// 消息发送到的topic
|
|
// 消息发送到的topic
|
|
assert success != null;
|
|
assert success != null;
|
|
@@ -96,13 +99,14 @@ public class TickScheduler {
|
|
int partition = success.getRecordMetadata().partition();
|
|
int partition = success.getRecordMetadata().partition();
|
|
// 消息在分区内的offset
|
|
// 消息在分区内的offset
|
|
long offset = success.getRecordMetadata().offset();
|
|
long offset = success.getRecordMetadata().offset();
|
|
- log.info("------- TickScheduler--retry 发送消息成功:\n"
|
|
|
|
|
|
+ log.info("------- TickScheduler--retry 项目 " + projectId + "的任务准备第" + finalRetryNumber + "次重试,"
|
|
|
|
+ + "发送消息成功:\n"
|
|
+ "主题 topic 为:" + topic + "\n"
|
|
+ "主题 topic 为:" + topic + "\n"
|
|
+ "分区 partition 为:" + partition + "\n"
|
|
+ "分区 partition 为:" + partition + "\n"
|
|
+ "偏移量为:" + offset + "\n"
|
|
+ "偏移量为:" + offset + "\n"
|
|
+ "消息体为:" + taskJson);
|
|
+ "消息体为:" + taskJson);
|
|
}, failure -> {
|
|
}, failure -> {
|
|
- log.error("------- ManualProjectConsumer 发送消息失败:" + failure.getMessage());
|
|
|
|
|
|
+ log.error("------- TickScheduler--retry 发送消息失败:" + failure.getMessage());
|
|
});
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|