我有一个情况,我有一个Kafka消费者检索Kafka使用投票机制的记录。有时,由于在session.timeout期间(我已配置为30秒)调用poll失败,此使用者会被踢出使用者组。我的问题是,如果发生这种情况,在稍后的某个时间点,民意调查会将消费者重新添加到组中,还是我需要做其他事情?
我使用的是Kafka版本0.10.2.1
编辑:2018年8月14日
更多信息。在我做了一个投票后,我从来没有在同一个线程中处理记录。我只需将所有记录添加到单独的队列(由单独的线程池提供服务)中进行处理。
我有一个情况,我有一个Kafka消费者检索Kafka使用投票机制的记录。有时,由于在session.timeout期间(我已配置为30秒)调用poll失败,此使用者会被踢出使用者组。我的问题是,如果发生这种情况,在稍后的某个时间点,民意调查会将消费者重新添加到组中,还是我需要做其他事情?
我使用的是Kafka版本0.10.2.1
编辑:2018年8月14日
更多信息。在我做了一个投票后,我从来没有在同一个线程中处理记录。我只需将所有记录添加到单独的队列(由单独的线程池提供服务)中进行处理。
3条答案
按热度按时间ckocjqey1#
如果消费者还不是组的成员,则poll将启动“加入组”请求,并将导致消费者加入组(除非某些错误情况阻止了它)。请注意,根据组状态(组中的其他成员、组中的订阅主题),使用者可能会或可能不会获得与被踢出之前所使用的分区相同的分区。如果消费者是组中唯一的消费者,则情况并非如此。
yqhsw0fo2#
如果消费者未能在指定的时间段内发送心跳信号,则会被踢出局。每一次投票都会给消费者群体协调员发送一次心跳。
你需要看看处理你的单张唱片要花多少时间。可能是超过了
session.timeout.ms value
你把它定为30秒。试着增加这个。同时保持max.poll.records
到一个较低的值。此设置确定在调用poll方法之后提取多少条记录。如果您获取了太多的记录,那么即使您将session.timeout.ms保持在一个较大的值,您的消费者仍然可能被踢出,组将进入重新平衡阶段。eivgtgni3#
瓦希德已经提到了当一个被驱逐的消费者重新加入这个组织时会发生什么。您还可以调整下面的配置,这样消费者就不会被踢出组。
max.poll.records—提供轮询循环中预定义的记录数(默认值:500)
max.poll.interval.ms—它为您提供处理在中接收的消息所需的时间量
poll
. (默认值:5分钟)您可以在kip-62中看到更新上述配置的影响
或者,您可以使用
KafkaConsumer#assign
模式,正如您所提到的,您只使用一个消费者。此模式不会进行任何重新平衡。