我阅读了关于为kafka消费者使用pause和resume方法的文档,它们似乎很容易实现。但是,我是否需要另一个线程在暂停时继续调用poll()方法以满足心跳要求而不触发重新平衡?
我的使用者在轮询主题后正在运行sql脚本,根据返回的消息,脚本可能需要比当前session.timeout.ms间隔更长的时间(我们增加了此值,但脚本运行的时间长度可能会有点变化,无论间隔是什么,我们都会不时超过它)。我还希望避免重新平衡,因为安全排序和数据完整性比吞吐量和错误保留更重要。
我阅读了关于为kafka消费者使用pause和resume方法的文档,它们似乎很容易实现。但是,我是否需要另一个线程在暂停时继续调用poll()方法以满足心跳要求而不触发重新平衡?
我的使用者在轮询主题后正在运行sql脚本,根据返回的消息,脚本可能需要比当前session.timeout.ms间隔更长的时间(我们增加了此值,但脚本运行的时间长度可能会有点变化,无论间隔是什么,我们都会不时超过它)。我还希望避免重新平衡,因为安全排序和数据完整性比吞吐量和错误保留更重要。
2条答案
按热度按时间aelbi1ox1#
是的,你需要继续打电话
poll()
在使用者上,即使您暂停所有分区,或者它将被踢出其成员所在的任何使用者组,并且其分配的分区将转移到另一个使用者。至于哪个线程最终调用poll—这无关紧要(只要一次只有一个线程与使用者交互)引用kip-62:
最大轮询间隔毫秒。此配置设置客户端调用poll()之间的最大延迟。当超时过期时,使用者将停止发送心跳并发送显式的leavegroup请求。
kh212irz2#
从版本0.10.1.0开始,心跳信号通过单独的线程发送,因此暂停进程线程不会影响心跳信号线程。
您可以查看此以了解更多信息。