我的工作是把数据写进Kafka。kafka主题将最大消息大小设置为5 mb,因此如果我尝试写入任何大于5 mb的记录,它会抛出以下异常并使作业停止。
java.lang.Exception: Failed to send data to Kafka: The request included a message larger than the max message size the server will accept.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:350)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
现在我已经在作业中配置了检查点,因此如果作业失败,它将重新启动。问题是,每次它重新启动时,它都会对同一条记录失败,并进入一个失败和重新启动的无限循环。有没有一种方法可以在我的代码中处理这个kafka异常,这样它就不会破坏整个工作?
1条答案
按热度按时间c3frrgcw1#
也许你可以在Kafka接收器前面引入一个过滤器,它可以检测并过滤掉太大的记录。有点老套,但可能很简单。否则,我将考虑扩展flinkkafkaproducer010,以便能够处理异常。