我正在使用Flink-Streaming文件接收器写入传入数据S3桶。我的代码与forRowFormat
选项完美配合。现在我正在尝试设置forBulkFormat
选项以将 parquet 格式的数据写入S3。我的接收器函数如下所示。
private static SinkFunction<Pojo> getS3Sink() {
final StreamingFileSink<Pojo> sink = StreamingFileSink
.forBulkFormat(new Path(s3SinkPath),
ParquetAvroWriters.forSpecificRecord(Pojo.class)
)
.withBucketAssigner(new CustomBucketAssigner())
.build();
return sink;
}
我在IntelliJ上运行这整个设置。当我运行此代码时,我得到以下错误:
java.lang.NoClassDefFoundError:org/apache/Hadoop/Map缩减/库/输出/文件输出格式
在java.lang.ClassLoader.defineClass1(本机方法)中~[?:?]
在java. lang.类加载器中。定义类(类加载器. java:1016)~[?:?]
在java. security.安全类加载器.定义类(安全类加载器. java:174)~[?:?]
在jdk.内部.加载器.内置类加载器.定义类(内置类加载器. java:800)~[?:?]
在jdk.内部.加载程序.内置类加载程序.查找类在类路径上或空(内置类加载程序. java:698)~[?:?]
在jdk.内部.加载器.内置类加载器.加载类或空(内置类加载器. java:621)~[?:?]
在jdk.内部.加载器.内置类加载器.加载类(内置类加载器. java:579)~[?:?]
在jdk.内部.加载器.类加载器$应用程序类加载器.加载类(类加载器. java:178)~[?:?]
在java. lang.类加载器中。加载类(类加载器. java:521)~[?:?]
网站地址:(ParquetWriter.java:285)~[ parquet-hadoop-1.12.0.jar:1.12.0]
网站地址:apache. parquet. hadoop. parquetwriter $www.example.com(parquetwriter. java:641)~[ parquet-hadoop-1.12.0.jar:1.12.0]Builder.build(ParquetWriter.java:641) ~[parquet-hadoop-1.12.0.jar:1.12.0]
网站地址:apache. flink. formats. parquet. avro. parquetavrowriters.创建avroparquetwriter(parquetavrowriters. java:87)~[ flink-Parquet地板_2. 12 - 1. 11. 2. jar:1. 11. 2]
在org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forSpecificRecords$824091b3$1( parquet AvroWriters.java:49)~[ flink-parquet_2. 12 - 1. 11. 2. jar:1. 11. 2]上找到的。
创建(Parquet工厂)~[Parquet工厂_2. 12 - 1. 11. 2. jar:1. 11. 2]
在org. apache. flink. streaming. api. functions. sink. filesystem.大容量存储桶编写器. openNew(大容量存储桶编写器. java:69)~[ flink-streaming-java_2. 12 - 1. 11. 2. jar:1. 11. 2]
文件系统。输出基于流的部分文件编写器$输出基于流的桶编写器。打开新的输入进度文件(输出基于流的部分文件编写器。java:83)~[ flink-流-java_2. 12 - 1. 11. 2. jar:1. 11. 2]
Flink文档没有谈到处理输出格式所需的额外配置,你能帮忙吗?
以下是maven依赖项:
<flink.version>1.11.2</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<avro.version>1.10.2</avro.version>
<flink.format.parquet.version>1.12.0</flink.format.parquet.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${flink.format.parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>${flink.version}</version>
</dependency>
谢谢。
1条答案
按热度按时间yjghlzjz1#
我也遇到过这个问题,通过添加一个
org.apache.hadoop:hadoop-mapreduce-client-core
依赖项和org.apache.flink:flink-hadoop-compatibility_2.12
,可以解决这个问题,org.apache.flink:flink-hadoop-compatibility_2.12
将所有必需的hadoop类填充到类路径中。还值得注意的是,flink docs目前建议从
parquet-avro
中排除一些传递依赖项,这可能有助于解决任何冲突。