我正在尝试实现一个简单的服务,从kafka中提取消息,将它们 Package 在一些数据中并发送到外部服务。在处理消息时,处理外部服务不可用的常见模式是什么?到目前为止,我只是在请求外部服务成功时才手动提交消息。我想Kafka重新发送消息一段时间后,如果它没有提交,以便处理外部服务失败是透明的消费者。但我找不到办法。然而,我很好奇,如果我没有做一些反模式和有更好的解决办法。
aurhwmvo1#
首先,你需要考虑,Kafka是以拉为基础的。因此,如果您想再次收到消息,您需要 seek() 其偏移量和 poll() .此外,如果要停止处理消息,可以 pause() 分区和更高版本 resume() 他们。请参阅consumer javadoc中的“消费流控制”部分:https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html因此,如果您的外部服务已关闭,请暂停并等待它恢复。
seek()
poll()
pause()
resume()
1条答案
按热度按时间aurhwmvo1#
首先,你需要考虑,Kafka是以拉为基础的。因此,如果您想再次收到消息,您需要
seek()
其偏移量和poll()
.此外,如果要停止处理消息,可以
pause()
分区和更高版本resume()
他们。请参阅consumer javadoc中的“消费流控制”部分:https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html因此,如果您的外部服务已关闭,请暂停并等待它恢复。