flume kafka sink中的org.apache.kafka.common.errors.recordtoolargeexception

x8goxv8g  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(970)

我试图从jms源读取数据并将它们推入kafka主题,在这样做的过程中,几个小时后,我观察到推入kafka主题的频率几乎为零,经过一些初步分析,我在flume日志中发现以下异常。

28 Feb 2017 16:35:44,758 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
        ... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

我的flume将max.request.size的当前设置值(在日志中)显示为1048576,这明显小于1399305,增加此max.request.size可以消除这些异常,但我无法找到更新该值的正确位置。
我的flume.config,

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.channels.c1.type = file
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 100000000
a1.channels.c1.checkpointDir = /data/flume/apache-flume-1.7.0-bin/checkpoint
a1.channels.c1.dataDirs = /data/flume/apache-flume-1.7.0-bin/data

a1.sources.r1.type = jms

a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = true

a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = some context urls
a1.sources.r1.connectionFactory = some_queue
a1.sources.r1.providerURL = some_url 

# a1.sources.r1.providerURL = some_url

a1.sources.r1.destinationType = QUEUE
a1.sources.r1.destinationName = some_queue_name 
a1.sources.r1.userName = some_user
a1.sources.r1.passwordFile= passwd

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = some_kafka_topic
a1.sinks.k1.kafka.bootstrap.servers = some_URL
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.flumeBatchSize = 1
a1.sinks.k1.channel = c1

任何帮助都将不胜感激!!

jvidinwx

jvidinwx1#

似乎我已经解决了我的问题;因为怀疑增加了 max.request.size 排除了这个异常,对于更新这样的kafka sink(producer)属性flume提供了如下常量前缀 kafka.producer . 我们可以用任何Kafka属性附加这个常量前缀;
所以我的是, a1.sinks.k1.kafka.producer.max.request.size = 5271988 .

uujelgoq

uujelgoq2#

这种改变必须在Kafka完成。更新kafka producer配置文件 producer.properties 具有更大的值,如

max.request.size=10000000

相关问题