org.apache.kafka.common.network.invalidreceiveexception:接收无效(大小=30662099大于30662028)

pinkon5k  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(400)

我正在尝试使用kafka sink将数据从flume通道推送到kafka集群,我可以看到相关的数据进入相关的主题,但同时我在kafka日志中过于频繁地观察到下面提到的异常跟踪,

[2017-03-21 16:47:56,250] WARN Unexpected error from /10.X.X.X; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 30662099 larger than 30662028)
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
        at kafka.network.Processor.run(SocketServer.scala:413)
        at java.lang.Thread.run(Thread.java:745)

最初的分析让我找到了我的flume日志,并在其中观察到了下面的异常跟踪,

21 Mar 2017 16:25:32,560 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:229)
        ... 3 more

从第一个堆栈跟踪来看,flume似乎试图推送大小为30662099字节的数据,但是kafka代理的消息接受限制被限制为30662028字节。
我在producer(flume)和broker(kafka)上保持了相似的消息发送和接收大小,即30662028,我担心如果我的flume只发送30662028字节,那么这些额外的字节是什么,它们与我的producer的消息一起累积,形成大小为30662099的最终消息,并导致此消息丢失。
任何帮助都是值得感激的!!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题