我试图从jms源读取数据并将它们推入kafka主题,在这样做的过程中,几个小时后,我观察到推入kafka主题的频率几乎为零,经过一些初步分析,我在flume日志中发现以下异常。
28 Feb 2017 16:35:44,758 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
我的flume将max.request.size的当前设置值(在日志中)显示为1048576,这明显小于1399305,增加此max.request.size可以消除这些异常,但我无法找到更新该值的正确位置。
我的flume.config,
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.channels.c1.type = file
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 100000000
a1.channels.c1.checkpointDir = /data/flume/apache-flume-1.7.0-bin/checkpoint
a1.channels.c1.dataDirs = /data/flume/apache-flume-1.7.0-bin/data
a1.sources.r1.type = jms
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = some context urls
a1.sources.r1.connectionFactory = some_queue
a1.sources.r1.providerURL = some_url
# a1.sources.r1.providerURL = some_url
a1.sources.r1.destinationType = QUEUE
a1.sources.r1.destinationName = some_queue_name
a1.sources.r1.userName = some_user
a1.sources.r1.passwordFile= passwd
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = some_kafka_topic
a1.sinks.k1.kafka.bootstrap.servers = some_URL
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.flumeBatchSize = 1
a1.sinks.k1.channel = c1
任何帮助都将不胜感激!!
2条答案
按热度按时间jvidinwx1#
似乎我已经解决了我的问题;因为怀疑增加了
max.request.size
排除了这个异常,对于更新这样的kafka sink(producer)属性flume提供了如下常量前缀kafka.producer
. 我们可以用任何Kafka属性附加这个常量前缀;所以我的是,
a1.sinks.k1.kafka.producer.max.request.size = 5271988
.uujelgoq2#
这种改变必须在Kafka完成。更新kafka producer配置文件
producer.properties
具有更大的值,如