在joingroup期间,是否可以要求kafka等待小于max.poll.interval.ms?

cunj1qz1  于 2021-06-04  发布在  Kafka
关注(0)|答案(3)|浏览(444)

背景:我有一个应用程序,运行在kubernetes中,它使用kafka作为一个集中式消息总线。我的应用程序中的kafka客户端可能非常慢。在将成员踢出组并重新平衡之前,kafka代理在后续poll()调用之间等待的最长时间由 max.poll.interval.ms .
对于此应用程序中的大多数工作者,我可以设置 max.poll.interval.ms 几分钟的时间。然而,对于在慢班的工人,我需要设置为几个小时。
当一切正常工作时,这不会引起问题。但是,在网络中断或间歇性崩溃的情况下,我注意到最大轮询间隔非常大的工作人员可能会在重新平衡时“卡住”。如果我在这种情况下看着经纪人,然后执行

kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --describe --members

然后我看到经纪人在等待一群不再存在的工人(我确信是这样,因为我设置了 group.instance.id 到kubernetes pod主机名,这样我就可以验证卡住的组成员是否真的消失了)。
通过这个问题,我看到kip-266说“joingroup api将被视为一个特例,它的超时将被设置为一个从max.poll.interval.ms派生的值。”那么,我猜发生的事情是,我的工作人员正在发送一个joingroup,就在与代理断开连接之前,这会导致经纪人等待 max.poll.interval.ms 在将他们标记为死亡并允许新工人重新平衡之前。
当这种情况发生的时候,我似乎必须把Kafka的经纪人带回来,以便清除死去的成员。。。否则,当代理等待完全超时时,所有处理都会被阻塞几个小时。这些都是糟糕的解决方案,我对此都不满意。
我的问题是:有没有一个环境我可以调整,以鼓励Kafka等待不到一个小时 max.poll.interval.ms 在放弃加入小组的请求之前?如果这意味着在网络中断后重新平衡会有一点波动,因为非常慢的消费者很晚才重新加入这个群体,那么我可以接受。如果没有这样的机制,我应该如何重构我的系统来避免我看到的问题?
我在经营合流Kafka confluentinc/cp-kafka:5.4.1 ,似乎是Kafka2.4.0。

46qrfjad

46qrfjad1#

我还没有解决这个问题(似乎没有解决办法),但我可能已经找到了一个方法来改进一些事情:设置 group.instance.id 到kubernetes主机名,并在kubernetes中使用statefulset,以便特定worker的主机名是稳定的。这样,当一个工人撞车并重新加入时,希望Kafka能够认出他是同一个工人,而不是徘徊等待鬼魂。

vybvopom

vybvopom2#

我最终的解决办法是搬到Apache脉冲星。
pulsar允许单独确认消息,这就解决了问题。

gzszwxb4

gzszwxb43#

在kafka中,当消费者组中开始重新平衡时,此消费者组中的所有消费者都将被吊销,kafka将等待所有活动消费者(发送心跳的消费者)进行poll()(为吊销的消费者调用poll意味着joingrouprequest)。重要的是:

rebalance timeout = max.poll.interval.ms

这是无法改变的。事实上,这是合理的,因为Kafka等待活着的消费者完成它的工作,并重新加入该集团。因此,当所有活动的消费者发送joingrouprequests或发生重新平衡超时时,重新平衡就完成了。
在重新平衡过程中,由于已撤消使用者组中的所有使用者,因此将停止此使用者组的消费操作。因此,作为一个好的实践,应该避免长时间运行的进程。
因此:
长时间运行的进程lead->long max.poll.interval.ms time lead->long rebalance time

相关问题