假设我正在使用 log.message.timestamp.type=LogAppendTime
.
还假设在第一次读取期间每个主题/分区的消息数: topic0:partition0
: 5 topic0:partition1
: 0 topic0:partition2
: 3 topic1:partition0
: 2 topic1:partition1
: 0 topic1:partition2
: 4
第二次阅读时: topic0:partition0
: 5 topic0:partition1
: 2 topic0:partition2
: 3 topic1:partition0
: 2 topic1:partition1
: 4 topic1:partition2
: 4
如果我从每个分区读取第一条消息,kafka是否保证从每个分区再次读取不会返回比我在第一次读取时读取的消息旧的消息?
专注于 topic0:partition1
以及 topic1:partition1
在第一次读取时没有任何消息,但在第二次读取时有。
1条答案
按热度按时间bzzcjhmw1#
kafka保证了分区级别的消息排序,因此您的用例完全符合kafka的体系结构。
这里有一些概念需要解释。首先,您拥有由
auto.offset.reset
参数。只有当该组没有保存的偏移量,或者保存的偏移量不再有效(例如,如果已被保留策略删除)时,此操作才会生效。通常情况下,只有当您启动一个新的消费群(并且您想确定它是从最旧的消息开始,还是从当前最新的消息开始)时,您才应该担心这个问题。
关于你的例子,在正常情况下(没有消费者停机等),你没有什么可担心的。
Consumers
同一时间内consumer group
将只读取他们的消息一次,无论分区的数量或使用者的数量如何。这些使用者会记住最后一次读取的偏移量,并定期将其保存在_consumer_offsets
主题。有两个属性定义此定期记录:
启用.auto.commit
设置为
true
(这是默认值)将允许自动提交到_consumer_offsets
主题。自动提交间隔毫秒
定义提交偏移的时间。例如,值为
10000
,消费偏移量将每10秒存储一次。您还可以将enable.auto.commit设置为false,并以自己的方式存储偏移量(例如存储到数据库等),但这是一个更特殊的用例。
自动补偿提交将允许您停止您的消费者,并在以后再次启动他们,而不会丢失任何消息,也不会重新处理已处理的消息(这就像一本书的页面中的标记)。如果你不阻止你的消费者(而且没有经纪人/Zookeeper/消费者的任何错误),你的担心就更少了。
欲了解更多信息,请查看:https://docs.confluent.io/current/clients/consumer.html#concepts
希望有帮助!