我用的是Kafka的高级消费者。因为我将kafka用作我的应用程序的“事务队列”,所以我需要绝对确保不会错过或重读任何消息。我有两个问题:
如何将偏移提交给zookeeper?在成功使用每条消息后,我将关闭自动提交和提交偏移量。我似乎找不到实际的代码示例来说明如何使用高级使用者来实现这一点。有人能帮我吗?
另一方面,我听说提交给zookeeper可能很慢,所以另一种方法可能是本地跟踪偏移量?这种替代方法可取吗?如果是,您将如何处理?
我用的是Kafka的高级消费者。因为我将kafka用作我的应用程序的“事务队列”,所以我需要绝对确保不会错过或重读任何消息。我有两个问题:
如何将偏移提交给zookeeper?在成功使用每条消息后,我将关闭自动提交和提交偏移量。我似乎找不到实际的代码示例来说明如何使用高级使用者来实现这一点。有人能帮我吗?
另一方面,我听说提交给zookeeper可能很慢,所以另一种方法可能是本地跟踪偏移量?这种替代方法可取吗?如果是,您将如何处理?
2条答案
按热度按时间pgpifvop1#
您可以先禁用自动提交:
auto.commit.enable=false
获取消息后提交:consumer.commitOffsets(true)
gxwragnw2#
中有两个相关设置http://kafka.apache.org/documentation.html#consumerconfigs.
和
如果您想将其设置为使用者在每条消息之后提交偏移量,这将很困难,因为唯一的设置是在计时器间隔之后,而不是在每条消息之后。您必须对传入的消息进行一些速率预测,并相应地设置时间。
一般来说,不建议将此间隔保持得太小,因为它会极大地提高zookeeper中的读/写速率,而zookeeper会变慢,因为它在其仲裁中具有很强的一致性。