flink提供了hdfs连接器,可用于将数据写入hadoop文件系统支持的任何文件系统。 提供的接收器是一个bucketing接收器,它将数据流划分为包含滚动文件的文件夹。bucketing行为以及写入行为可以使用以下参数进行配置: batch size 以及 batch roll over time interval flink文档给出了以下示例-
DataStream<Tuple2<IntWritable,Text>> input = ...;
BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
input.addSink(sink);
2条答案
按热度按时间cig3rfwq1#
flink提供了hdfs连接器,可用于将数据写入hadoop文件系统支持的任何文件系统。
提供的接收器是一个bucketing接收器,它将数据流划分为包含滚动文件的文件夹。bucketing行为以及写入行为可以使用以下参数进行配置:
batch size
以及batch roll over time interval
flink文档给出了以下示例-bprjcwpo2#
在这一点上,较新的流文件接收器可能是比bucketing接收器更好的选择。此描述来自Flink1.6发行说明(请注意,Flink1.7中添加了对s3的支持):
新的streamingfilesink是一个只需一次就可以写入文件系统的接收器,它利用了从以前的bucketingsink中获得的知识。通过集成sink和flink的检查点机制,可以支持一次。新的sink建立在flink自己的文件系统抽象之上,它支持本地文件系统和hdfs,并计划在不久的将来支持s3(现在包含在flink1.7中)。它公开了可插入的文件滚动和压缩策略。除了行编码格式之外,新的streamingfilesink还支持parquet。使用公开的api可以很容易地添加其他批量编码格式,如orc。