关于flink溪流汇至hdfs

5ssjco0h  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(378)

我正在编写一个flink代码,其中我正在从本地系统读取一个文件,并使用“writeusingoutputformat”将其写入数据库。
现在我的要求是写入hdfs而不是数据库。
你能帮我在Flink怎么办吗。
注意:hdfs已启动并在本地计算机上运行。

cig3rfwq

cig3rfwq1#

flink提供了hdfs连接器,可用于将数据写入hadoop文件系统支持的任何文件系统。
提供的接收器是一个bucketing接收器,它将数据流划分为包含滚动文件的文件夹。bucketing行为以及写入行为可以使用以下参数进行配置: batch size 以及 batch roll over time interval flink文档给出了以下示例-

DataStream<Tuple2<IntWritable,Text>> input = ...;

BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

input.addSink(sink);
bprjcwpo

bprjcwpo2#

在这一点上,较新的流文件接收器可能是比bucketing接收器更好的选择。此描述来自Flink1.6发行说明(请注意,Flink1.7中添加了对s3的支持):
新的streamingfilesink是一个只需一次就可以写入文件系统的接收器,它利用了从以前的bucketingsink中获得的知识。通过集成sink和flink的检查点机制,可以支持一次。新的sink建立在flink自己的文件系统抽象之上,它支持本地文件系统和hdfs,并计划在不久的将来支持s3(现在包含在flink1.7中)。它公开了可插入的文件滚动和压缩策略。除了行编码格式之外,新的streamingfilesink还支持parquet。使用公开的api可以很容易地添加其他批量编码格式,如orc。

相关问题