flinkkafkaconsumer无法读取lz4压缩主题

wfsdck30  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(620)

我们有几个flink应用程序阅读Kafka主题,它们工作得很好。但最近我们在现有的flink作业中添加了一个新主题,它在启动时立即失败,并出现以下根错误:

Caused by: org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: net/jpountz/lz4/LZ4Exception
    at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:113)
    at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:256)
    at org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:334)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1208)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1245)
    ... 7 more

我发现这个主题有lz4压缩,我猜flink出于某种原因无法使用它。将lz4依赖项直接添加到应用程序中是行不通的,奇怪的是,它在本地运行良好,但在远程集群上失败。
flink运行时版本是1.9.1,我们的应用程序中所有其他依赖项的版本都相同:flink-streaming-java\u 2.11、flink-connector-kafka\u 2.11、flink-java和flink-clients\u 2.11
这可能是因为flink不依赖内部的lz4库吗?

9q78igpj

9q78igpj1#

找到了解决办法。不需要版本升级,也不需要应用程序本身的附加依赖项。我们的解决方法是将lz4库jar直接添加到docker映像中的flink libs文件夹中。之后,lz4压缩的错误消失了。

相关问题