我写了一个Kafka消费者。在初始化期间,我给予引导服务器ip的列表,格式为ip1:9092、ip2:9092、ip3:9092。在一段时间内,可以用另一个ip4和ip5替换ip2和ip3。但是我们可能无法更新给客户端的引导服务器列表。理想情况下,只要有至少一个工作节点ip,Kafka consumer就可以工作。
在我的应用程序中,我正在为一个主题调用fetterfly partition(这个问题并不特定于这个调用)。为了获取所有分区列表,Kafka consumer首先找出负载最小的节点,因为ip2和ip3没有任何连接,它们会随机出现在顶部(因为在启动期间,每个节点都是负载最小的),当Kafka consumer调用ip2或ip3时,它会不断重试,最终default.api.timeout.ms限制命中,我们得到apitimeout异常。
下面是我在日志中得到的错误
Timeout expired while fetching topic metadata
我有一些疑问
1.是否可以为节点设置连接超时?这样我们就不会一直再试
1.是否可以设置连接到节点的重试次数限制?
1条答案
按热度按时间a1o7rhls1#
After this pr
socket.connection.setup.timeout.ms
可用于设置较低的连接建立超时。此更改自Kafka客户端版本2.7起可用。
This wiki有更多实现细节。