使用apachekafkajava客户机(0.9),我尝试使用kafka producer类向代理发送一长串记录。异步send方法立即返回一段时间,然后开始在短时间内阻塞每个调用。大约30秒后,客户机开始抛出异常(timeoutexception),并显示消息“batch expired”。什么情况导致抛出此异常?
1u4esq0p1#
控制发送到代理之前的时间的参数是 linger.ms . 其默认值为0(无延迟)。
linger.ms
tnkciper2#
我解决了。我的kafka部署在docker容器中,容器的网络模式是网桥,主机和容器使用端口Map,我将kafka服务器的默认端口更改为9102。server.properties中解决此问题的配置项有以下两个:listeners adverted.listeners我尝试了几种组合:成功:
listeners=PLAINTEXT://:9102 advertised.listeners=PLAINTEXT://192.168.0.136:9102
服务器无法启动:
listeners=PLAINTEXT://192.168.0.136:9102 advertised.listeners=PLAINTEXT://192.168.0.136:9102
超时错误:
listeners=PLAINTEXT://:9102 advertised.listeners=PLAINTEXT://:9102
ryevplcw3#
创建使用者时,请将consumerconfig.enable\u auto\u commit\u config设置为true。
bejyjqdl4#
我在完全不同的背景下得到了这个例外。我已经设置了一个由zookeeper vm、broker vm和producer/consumer vm组成的小型集群。我打开了服务器(9092)和zookeeper(2181)上所有必要的端口,然后尝试将消息从使用者/发布者vm发布到代理。我得到了op提到的异常,但是由于到目前为止我只发布了一条消息(或者至少我尝试过发布),所以解决方案不能是增加超时或批处理大小。所以我搜索了一下,发现这个邮件列表描述了一个类似的问题,我在尝试使用consumer/producer vm中的消息时遇到了这个问题(closedchannelexception):http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config 这个邮件列表中的最后一篇文章实际上描述了如何解决这个问题问题。长话短说,如果你同时面对 ChannelClosedException 以及 Batch Expired 例外情况下,您可能必须在 server.config 归档并重新启动代理:
ChannelClosedException
Batch Expired
server.config
advertised.host.name=<broker public IP address>
如果没有设置,它将返回到 host.name 属性(可能也没有设置),然后返回到 InetAddress java类,这最后当然是不正确的,从而混淆了远程节点。
host.name
InetAddress
f8rj6qna5#
此异常表示您正在以比发送记录更快的速度对记录进行排队。调用send方法时,producerrecord将存储在内部缓冲区中,以便发送到代理。一旦producerrecord被缓冲,不管它是否被发送,该方法都会立即返回。记录被分组成批发送到代理,以减少每条消息的传输开销并提高吞吐量。一旦记录被添加到一个批中,就有一个发送该批的时间限制,以确保它在指定的持续时间内被发送。这由producer配置参数request.timeout.ms控制,默认值为30秒。如果批处理的排队时间超过超时限制,则将引发异常。该批处理中的记录将从发送队列中删除。使用配置参数增加超时限制将允许客户机在批处理过期之前将其排队更长时间。
e5nqia276#
我使用的是kafka java客户端版本0.11.0.0。我也开始看到同样的模式,即无法始终如一地生成大量消息。它传递的信息很少,而其他一些则没有(虽然传递的消息和失败的消息大小相同),但在我的例子中,每条消息的大小都在60kb左右,这远远高于kafka的默认值 batch.size 16kb,也是我的 linger.ms 已设置为默认值0。由于生产者客户端在从服务器接收成功响应之前超时,因此引发此错误。基本上,在我的代码中,此调用超时: kafkaProd.send(pr).get() . 为了解决这个问题,我不得不增加生产者客户的违约 request.timeout.ms 至60000
batch.size
kafkaProd.send(pr).get()
request.timeout.ms
kadbb4597#
Kafka也有类似的问题。我的docker-compose.yml被设置为
KAFKA_ADVERTISED_HOST_NAME: kafka ports: - 9092:9092
但当我试图从码头外给 Camel 发信息时
to("kafka:test?brokers=localhost:9092")
我有个超时例外。我通过添加
127.0.0.1 kafka
到windows\system32\drivers\etc\hosts文件,然后将我的camel url更改为
to("kafka:test?brokers=kafka:9092")
7条答案
按热度按时间1u4esq0p1#
控制发送到代理之前的时间的参数是
linger.ms
. 其默认值为0(无延迟)。tnkciper2#
我解决了。
我的kafka部署在docker容器中,容器的网络模式是网桥,主机和容器使用端口Map,我将kafka服务器的默认端口更改为9102。
server.properties中解决此问题的配置项有以下两个:listeners adverted.listeners
我尝试了几种组合:
成功:
服务器无法启动:
超时错误:
ryevplcw3#
创建使用者时,请将consumerconfig.enable\u auto\u commit\u config设置为true。
bejyjqdl4#
我在完全不同的背景下得到了这个例外。
我已经设置了一个由zookeeper vm、broker vm和producer/consumer vm组成的小型集群。我打开了服务器(9092)和zookeeper(2181)上所有必要的端口,然后尝试将消息从使用者/发布者vm发布到代理。我得到了op提到的异常,但是由于到目前为止我只发布了一条消息(或者至少我尝试过发布),所以解决方案不能是增加超时或批处理大小。所以我搜索了一下,发现这个邮件列表描述了一个类似的问题,我在尝试使用consumer/producer vm中的消息时遇到了这个问题(closedchannelexception):http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config 这个邮件列表中的最后一篇文章实际上描述了如何解决这个问题问题。
长话短说,如果你同时面对
ChannelClosedException
以及Batch Expired
例外情况下,您可能必须在server.config
归档并重新启动代理:如果没有设置,它将返回到
host.name
属性(可能也没有设置),然后返回到InetAddress
java类,这最后当然是不正确的,从而混淆了远程节点。f8rj6qna5#
此异常表示您正在以比发送记录更快的速度对记录进行排队。
调用send方法时,producerrecord将存储在内部缓冲区中,以便发送到代理。一旦producerrecord被缓冲,不管它是否被发送,该方法都会立即返回。
记录被分组成批发送到代理,以减少每条消息的传输开销并提高吞吐量。
一旦记录被添加到一个批中,就有一个发送该批的时间限制,以确保它在指定的持续时间内被发送。这由producer配置参数request.timeout.ms控制,默认值为30秒。
如果批处理的排队时间超过超时限制,则将引发异常。该批处理中的记录将从发送队列中删除。
使用配置参数增加超时限制将允许客户机在批处理过期之前将其排队更长时间。
e5nqia276#
我使用的是kafka java客户端版本0.11.0.0。我也开始看到同样的模式,即无法始终如一地生成大量消息。它传递的信息很少,而其他一些则没有(虽然传递的消息和失败的消息大小相同),但在我的例子中,每条消息的大小都在60kb左右,这远远高于kafka的默认值
batch.size
16kb,也是我的linger.ms
已设置为默认值0。由于生产者客户端在从服务器接收成功响应之前超时,因此引发此错误。基本上,在我的代码中,此调用超时:kafkaProd.send(pr).get()
. 为了解决这个问题,我不得不增加生产者客户的违约request.timeout.ms
至60000kadbb4597#
Kafka也有类似的问题。我的docker-compose.yml被设置为
但当我试图从码头外给 Camel 发信息时
我有个超时例外。我通过添加
到windows\system32\drivers\etc\hosts文件,然后将我的camel url更改为