所以我有一个springboot端点控制器,它是这样开始的:
@RequestMapping(value = "/post", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public Response post(@Valid @RequestBody Message message) throws FailedToPostException {
message.setRecieveTime(System.currentTimeMillis());
return this.service.post(message);
}
以及post函数:
public Response post(Message message) throws FailedToPostException{
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("topicName", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
@Override
public void onSuccess(SendResult<String, Message> result) {
LOGGER.info("Post Finished. '{}' with offset: {}", message,
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error("Message Post Failed. '{}'", message, ex);
long nowMillis = System.currentTimeMillis();
int diffSeconds = (int) ((nowMillis - message.getRecieveTime()) / 1000);
if (diffSeconds >= 10) {
LOGGER.debug("timeout sending message to Kafka, aborting.");
return;
}
else {
post(message);
}
}
});
LOGGER.debug("D: " + Utils.getMetricValue("buffer-available-bytes", kafkaTemplate));
return new Response("Message Posted");
}
现在你可以看到,我们正在努力确保 kafkaTemplate.send
失败,我们将递归调用 post(message)
再次长达10秒,直到生产者内存缓冲区清除,消息通过。
问题是:
我们希望能够将失败响应返回给端点的客户端(例如:“failed to acknowledge the message”)。
在上面这样的代码中,有没有更好的方法来处理来自未来的异常?
有没有办法避免在这里使用递归函数?我们这样做是因为我们想在把信息以电子邮件的形式发送给Kafka之前,先把它发送给Kafka10秒钟。
旁注:我还是没用 buffer-available-bytes
属性,我打算使用它来最小化出现这个问题的可能性,但是仍然需要处理上面的问题,以防出现某些竞争条件
1条答案
按热度按时间nue99wik1#
有一些方法可以做到这一点,但我真的很喜欢
Spring Retry
作为解决这类问题的一种方法。这是一段伪代码,但如果你需要更多的细节,我可以让事情更明确:在不递归的情况下有效地执行相同的操作。我不建议用递归代码来处理错误。
控制器应该能够按摩实际的
KafkaSendException
带着一个漂亮的@ExceptionHandler
.