Flink流媒体文件接收器- ParquetAvroWriters

ubbxdtey  于 2023-02-27  发布在  Apache
关注(0)|答案(1)|浏览(242)

我正在使用Flink-Streaming文件接收器写入传入数据S3桶。我的代码与forRowFormat选项完美配合。现在我正在尝试设置forBulkFormat选项以将 parquet 格式的数据写入S3。我的接收器函数如下所示。

private static SinkFunction<Pojo> getS3Sink() {

   final StreamingFileSink<Pojo> sink = StreamingFileSink
        .forBulkFormat(new Path(s3SinkPath),
                ParquetAvroWriters.forSpecificRecord(Pojo.class)
        )

        .withBucketAssigner(new CustomBucketAssigner())
        .build();
   return sink;
}

我在IntelliJ上运行这整个设置。当我运行此代码时,我得到以下错误:
java.lang.NoClassDefFoundError:org/apache/Hadoop/Map缩减/库/输出/文件输出格式
在java.lang.ClassLoader.defineClass1(本机方法)中~[?:?]
在java. lang.类加载器中。定义类(类加载器. java:1016)~[?:?]
在java. security.安全类加载器.定义类(安全类加载器. java:174)~[?:?]
在jdk.内部.加载器.内置类加载器.定义类(内置类加载器. java:800)~[?:?]
在jdk.内部.加载程序.内置类加载程序.查找类在类路径上或空(内置类加载程序. java:698)~[?:?]
在jdk.内部.加载器.内置类加载器.加载类或空(内置类加载器. java:621)~[?:?]
在jdk.内部.加载器.内置类加载器.加载类(内置类加载器. java:579)~[?:?]
在jdk.内部.加载器.类加载器$应用程序类加载器.加载类(类加载器. java:178)~[?:?]
在java. lang.类加载器中。加载类(类加载器. java:521)~[?:?]
网站地址:(ParquetWriter.java:285)~[ parquet-hadoop-1.12.0.jar:1.12.0]
网站地址:apache. parquet. hadoop. parquetwriter $www.example.com(parquetwriter. java:641)~[ parquet-hadoop-1.12.0.jar:1.12.0]Builder.build(ParquetWriter.java:641) ~[parquet-hadoop-1.12.0.jar:1.12.0]
网站地址:apache. flink. formats. parquet. avro. parquetavrowriters.创建avroparquetwriter(parquetavrowriters. java:87)~[ flink-Parquet地板_2. 12 - 1. 11. 2. jar:1. 11. 2]
在org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forSpecificRecords$824091b3$1( parquet AvroWriters.java:49)~[ flink-parquet_2. 12 - 1. 11. 2. jar:1. 11. 2]上找到的。
创建(Parquet工厂)~[Parquet工厂_2. 12 - 1. 11. 2. jar:1. 11. 2]
在org. apache. flink. streaming. api. functions. sink. filesystem.大容量存储桶编写器. openNew(大容量存储桶编写器. java:69)~[ flink-streaming-java_2. 12 - 1. 11. 2. jar:1. 11. 2]
文件系统。输出基于流的部分文件编写器$输出基于流的桶编写器。打开新的输入进度文件(输出基于流的部分文件编写器。java:83)~[ flink-流-java_2. 12 - 1. 11. 2. jar:1. 11. 2]
Flink文档没有谈到处理输出格式所需的额外配置,你能帮忙吗?
以下是maven依赖项:

<flink.version>1.11.2</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<avro.version>1.10.2</avro.version>
<flink.format.parquet.version>1.12.0</flink.format.parquet.version>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-avro</artifactId>
    <version>${flink.format.parquet.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parquet_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-s3-fs-hadoop</artifactId>
    <version>${flink.version}</version>
</dependency>

谢谢。

yjghlzjz

yjghlzjz1#

我也遇到过这个问题,通过添加一个org.apache.hadoop:hadoop-mapreduce-client-core依赖项和org.apache.flink:flink-hadoop-compatibility_2.12,可以解决这个问题,org.apache.flink:flink-hadoop-compatibility_2.12将所有必需的hadoop类填充到类路径中。
还值得注意的是,flink docs目前建议从parquet-avro中排除一些传递依赖项,这可能有助于解决任何冲突。

相关问题