扩展Kafka消费者池,同时确保所有消息都得到处理

rggaifut  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(344)

我想在传统的消息队列配置中使用kafka。即:
生产者可以在消费者开始之前就开始写这个主题
一个给定的主题有很多消费者
可以添加消费者以提高吞吐量
不管配置如何,最终都会处理所有消息
答:第一点的意思是,我认为我需要从 auto.offset.reset = earliest 否则我将错过在消费者启动之前发送的消息。
第3点意味着我可以通过增加消费者来重新平衡。假设a点是正确的,我应该使用哪个值 auto.offset.reset ?
b:如果我准备好了 auto.offset.reset = earliest 那么我会处理重复的消息吗(我知道重新平衡的行为本身可能会导致重复的消息处理)。
c:或者我应该设置 auto.offset.reset = latest ? 如果是这样,Kafka会确保所有的信息都得到处理吗?
d:这是否意味着我需要一个不同的配置值 auto.offset.reset 对于不同时间开始的消费者?
我使用的是当前版本0.10.2.1。

qvtsj1bj

qvtsj1bj1#

auto.offset.reset 当zookeeper中没有初始偏移或偏移超出范围时使用。这只是第一次真正的消费者群体开始,zookeeper将缺乏一个抵消。例如,如果一个消费者组中的所有消费者都在比主题保留期更长的时间内处于停机状态,则可能会发生超出范围的偏移。因此,通过适当地配置保留时间,您将不会看到由此导致的超出范围的偏移。
如果你用过 auto.offset.reset = earliest 然后在没有初始偏移量或超出范围偏移量的情况下,就有可能重新处理消息。但是,由于您必须处理重复的可能性(因为kafka至少提供了一次语义),这应该没有什么不同。
如果你用过 auto.offset.reset = latest 在没有初始偏移量或偏移量超出范围的情况下,您可能无法处理某些消息。

arknldoa

arknldoa2#

在Kafka建立消费者有两个基本原则, Consumer 以及 Consumer Group . 但是 auto.offset.reset 保持不变。
答:第1点意味着我需要使用auto.offset.reset=earlish启动初始消费者,否则我将错过消费者启动前发送的消息。
第3点意味着我可以通过增加消费者来重新平衡。假设a点是正确的,我应该使用哪个值来进行auto.offset.reset?
我想你是在同一个群体中增加新的消费者。在这种情况下,您必须增强主题的分区,否则,新的使用者将处于空闲状态。因为,一个主题上的一个分区只能由一个使用者组中的一个使用者使用。
如果新使用者需要在主题启动前使用添加到主题中的消息,则应设置使用者组 auto.offset.reset = earliest . 此配置仅在特定分区没有提交的偏移量时起作用(在重新平衡的情况下)。
b:如果我设置auto.offset.reset=earliest,那么我会处理重复的消息吗(我知道重新平衡的行为本身可能会导致重复的消息处理)。
否。在使用者重新平衡的情况下,它将获取分配分区的最新提交偏移量。它使用 auto.offset.reset = earliest 没有提交的偏移量的场景中的配置。
c:还是应该设置auto.offset.reset=最新?如果是这样,Kafka会确保所有的信息都得到处理吗?
请阅读我对b点的评论。
希望对你有帮助。

相关问题