Kafka在高负荷下的平衡问题

zd287kbt  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(234)

使用kafka版本2.11-0.11.0.3发布10000条消息(所有消息的总大小为10mb),将有2个消费者(具有相同的组id)将消息作为并行处理进行消费。在消费时,两个消费者消费了相同的消息。
下面是Kafka抛出的错误/警告
警告:此成员将离开组,因为使用者轮询超时已过期。这意味着对poll()的后续调用之间的时间间隔长于配置的max.poll.interval.ms,这通常意味着poll循环在处理消息方面花费了太多时间。您可以通过增加max.poll.interval.ms或使用max.poll.records减少poll()中返回的批的最大大小来解决此问题。
信息:尝试检测心跳失败,因为组正在重新平衡
信息:正在向协调器发送leavegroup请求
警告:偏移量{insect-data-1=offsetandmetadata{offset=5506,leaderepoch=null,metadata=''}}的同步自动提交失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间间隔长于配置的max.poll.interval.ms,这通常意味着poll循环在消息处理上花费了太多时间。您可以通过增加max.poll.interval.ms或使用max.poll.records减少poll()中返回的批的最大大小来解决此问题。
向Kafka提供了以下配置
服务器属性

max.poll.interval.ms=30000
group.initial.rebalance.delay.ms=0
group.max.session.timeout.ms=120000
group.min.session.timeout.ms=6000

消费者财产

session.timeout.ms=30000 
request.timeout.ms=40000

为了解决多重消费,应该改变什么?

8iwquhpp

8iwquhpp1#

你的消费者是同一个群体吗?如果是,那么如果消费者离开/死亡/超时而没有提交它已处理的一些消息,您将有多个消费。
如果您的所有消息都被两个使用者使用,那么您可能没有为它们设置相同的组id。
更多信息:
因此,您为所有消费者设置了相同的组id,很好。您所处的情况是,集群/代理认为某个使用者已死亡,因此将负载重新平衡到另一个使用者。另一个将从上次提交的位置开始使用。
因此,假设使用者c琰a从分区p琰1读取偏移量高达100,然后处理它们,然后提交'100',然后读取偏移量高达200,然后处理它们,但无法提交,因为代理认为c琰a已死亡。
代理将分区p_1重新分配给使用者c_b,该使用者c_b将从组的最后一次提交开始,即100,将读取到200,处理并提交200。
所以你的问题是如何避免消费者被认为是死的(我假设它没有死)?
答案已经出现在您的问题中的黄色警告消息中:您可以告诉您的消费者在一次轮询中消耗更少的消息(max.poll.records),以减少对代理的两次轮询之间的处理时间,和/或您可以增加max.poll.interval.ms,告诉代理在认为您的消费者已死亡之前等待更长时间。。。

相关问题