我正在使用一个只有一个分区和消费者组的主题来模拟分布式锁,主动示例在那里发布心跳,被动示例消费它们,当一段时间内没有心跳发送时,被动示例变为主动。
主题的retention.ms = 5分钟,segment.ms = 5分钟,使用者的auto.offset.reset =最早,enable.auto.commit =假,最大轮询记录数=150000。
偏移量永远不会提交。我希望消费者在每次轮询时都能获得主题中的所有数据。问题是有时轮询并不会返回主题中的所有数据。
在什么情况下民意调查会这样做,我如何避免?
1条答案
按热度按时间dy1byipe1#
除非调用
consumer.seekToBeginning()
或使用auto.offset.reset=earliest
和enable.auto.commit=false
重新启动消费者,否则每次轮询都将在消费者进程处于活动状态时从上一批处理的以下偏移标记处开始。Zookeeper可能是分布式锁的一个更好的选择,并且它是一个有良好文档记录的模式,或者as mentioned before, etcd。
如果你使用一个主题来维护某个状态,那么任何组中只有一个用户可以读取该数据。要从该主题查询数据,你可以构建一个KTable并实现交互式查询。