我想告诉Kafka,当我的消费者已经成功地处理了一个记录,所以我已经关闭了自动提交设置 enable.auto.commit
错误的。我订阅了两条关于某个主题的消息,偏移量分别为0和1,并创建了一个使用者,以便每次调用 poll
最多返回一条记录(通过设置 max.poll.records
到1)。
我现在打电话 consumer.poll(5000)
收到第一条信息,但我不承认;我不打电话 commitSync
或者 commitAsync
. 如果我现在打电话 consumer.poll(5000)
同样,使用同一个消费者,我希望得到与我刚才读到的完全相同的消息,但是,相反,我收到了第二条消息。
我怎么才能得到 consumer.poll
一直发同样的信息直到我明确承认?
1条答案
按热度按时间vom3gejh1#
你所描述的是预期的行为。每次你打电话
poll()
,它将返回下一条消息。您提交的偏移量仅在连接新使用者时使用,以便它知道从何处(重新)开始。在messagehub中,我们设置了
session.timeout
到30秒。所以你需要打电话poll()
稍微快一点以避免断开连接。如果您的处理时间比这长,那么我可以考虑两个选项:使用kafka 0.10.2并设置
max.poll.interval.ms
告诉你的Kafka客户保持会话活动(而不必打电话)poll()
)处理上一条记录时(此功能是在0.10.1中添加的,但我们不支持该版本。0.10.2可以工作,因为它能够与0.10.0代理一起工作)使用seek()移回之前的偏移量
poll
所以它一直返回相同的记录。希望这有帮助!