kafka 0.10 java客户端timeoutexception:包含1条记录的批处理已过期

cbjzeqam  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(581)

我有一个单节点,多(3)个代理zookeeper/kafka设置。我使用的是kafka0.10java客户端。
我编写了以下简单的远程(与kafka不同的服务器上)producer(在代码中我用myip替换了我的公共ip地址):

  1. Properties config = new Properties();
  2. try {
  3. config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
  4. config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, MYIP:9094");
  5. config.put(ProducerConfig.ACKS_CONFIG, "all");
  6. config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  7. config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  8. producer = new KafkaProducer<String, byte[]>(config);
  9. Schema.Parser parser = new Schema.Parser();
  10. schema = parser.parse(GATEWAY_SCHEMA);
  11. recordInjection = GenericAvroCodecs.toBinary(schema);
  12. GenericData.Record avroRecord = new GenericData.Record(schema);
  13. //Filling in avroRecord (code not here)
  14. byte[] bytes = recordInjection.apply(avroRecord);
  15. Future<RecordMetadata> future = producer.send(new ProducerRecord<String, byte[]>(datasetId+"", "testKey", bytes));
  16. RecordMetadata data = future.get();
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }

3个代理的服务器属性如下所示(在3个不同的服务器属性文件中,broker.id为0、1、2,侦听器为纯文本://:9092、纯文本://:9093、纯文本://:9094,host.name为10.2.0.4、10.2.0.5、10.2.0.6)。这是第一个服务器属性文件:

  1. broker.id=0
  2. listeners=PLAINTEXT://:9092
  3. num.network.threads=3
  4. num.io.threads=8
  5. socket.send.buffer.bytes=102400
  6. socket.receive.buffer.bytes=102400
  7. socket.request.max.bytes=104857600
  8. log.dirs=/tmp/kafka1-logs
  9. num.partitions=1
  10. num.recovery.threads.per.data.dir=1
  11. log.retention.hours=168
  12. log.segment.bytes=1073741824
  13. log.retention.check.interval.ms=300000
  14. zookeeper.connect=localhost:2181
  15. zookeeper.connection.timeout.ms=6000

执行代码时,出现以下异常:

  1. java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0
  2. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
  3. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
  4. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
  5. at com.nr.roles.gateway.GatewayManager.addTransaction(GatewayManager.java:212)
  6. at com.nr.roles.gateway.gw.service(gw.java:126)
  7. at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
  8. at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:821)
  9. at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
  10. at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1158)
  11. at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
  12. at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1090)
  13. at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
  14. at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:109)
  15. at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:119)
  16. at org.eclipse.jetty.server.Server.handle(Server.java:517)
  17. at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:308)
  18. at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:242)
  19. at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:261)
  20. at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)
  21. at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:75)
  22. at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213)
  23. at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147)
  24. at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
  25. at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
  26. at java.lang.Thread.run(Thread.java:745)
  27. Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0

有人知道我错过了什么吗?任何帮助都将不胜感激。谢谢

monwx1rj

monwx1rj1#

端口信息 BOOTSTRAP_SERVERS_CONFIG 配置不正确(myip:9092).
正如您在server.properties中提到的“纯文本://:9093,纯文本://:9093,纯文本://:9094”。

t9eec4r0

t9eec4r02#

我也遇到同样的问题。
您应该更改kafka server.properties以指定ip地址。如: PLAINTEXT://YOUIP:9093 如果没有,Kafka将使用主机名,如果制作者不能得到主机,它就不能发送消息给Kafka,即使你可以telnet他们。

pod7payv

pod7payv3#

这个答案与我们有一些共同的见解。您可以增加request.timeout.ms producer配置,这将允许客户端在批处理过期之前将其排队更长时间。
您可能还希望查看batch.size和linger.ms配置,并找到适合您的情况的最佳配置。

相关问题