我正在使用bucketingsink类编写一个poc来写入hdfs文件。即使数据正在写入hdfs文件,但这些文件仍与hdfs上的“.pending”一起存在。
下面是我正在使用的代码。有人能帮我确定问题并帮我解决吗?
BucketingSink<String> HdfsSink = new BucketingSink<String>("hdfs://xxxx/xxxx/xxxx/Test/");
HdfsSink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
HdfsSink.setBatchSize(1024 * 1024 * 2); // this is 2 MB,
HdfsSink.setInactiveBucketCheckInterval(10000L);
HdfsSink.setInactiveBucketThreshold(10000L);
2条答案
按热度按时间sg2wtvxw1#
嗨,你可以用这个。
嗨,未完成的桶有。挂起的扩展。一旦一个bucket关闭(例如,对于time bucketing,一旦时间结束),文件将被重命名。因为您使用的是nonrollingbucketer,所以文件永远不会关闭。我建议你用datetimebucketer。
作为旁注:我建议您增加一点检查点间隔。123毫秒是非常频繁的,应用程序看起来并不是非常关键的延迟。像2000毫秒这样的值可能更合适。
c0vxltue2#
我发现文件保留为.pending的真正原因是。。。因为我没有启用检查点。一旦我启用了检查点。。。文件正在成功关闭,但未显示为.pending。
您可以通过设置
env.enableCheckpointing(<duration>)
请检查url@https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/bucketingsink.html 更多细节。