apache flink-buckeingsink类的用法

dgsult0t  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(601)

我正在使用bucketingsink类编写一个poc来写入hdfs文件。即使数据正在写入hdfs文件,但这些文件仍与hdfs上的“.pending”一起存在。
下面是我正在使用的代码。有人能帮我确定问题并帮我解决吗?

BucketingSink<String> HdfsSink = new BucketingSink<String>("hdfs://xxxx/xxxx/xxxx/Test/");
HdfsSink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
HdfsSink.setBatchSize(1024 * 1024 * 2); // this is 2 MB,
HdfsSink.setInactiveBucketCheckInterval(10000L);
HdfsSink.setInactiveBucketThreshold(10000L);
sg2wtvxw

sg2wtvxw1#

嗨,你可以用这个。
嗨,未完成的桶有。挂起的扩展。一旦一个bucket关闭(例如,对于time bucketing,一旦时间结束),文件将被重命名。因为您使用的是nonrollingbucketer,所以文件永远不会关闭。我建议你用datetimebucketer。
作为旁注:我建议您增加一点检查点间隔。123毫秒是非常频繁的,应用程序看起来并不是非常关键的延迟。像2000毫秒这样的值可能更合适。

c0vxltue

c0vxltue2#

我发现文件保留为.pending的真正原因是。。。因为我没有启用检查点。一旦我启用了检查点。。。文件正在成功关闭,但未显示为.pending。
您可以通过设置 env.enableCheckpointing(<duration>) 请检查url@https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/bucketingsink.html 更多细节。

相关问题