如何在ApacheFlink中为bucketingsink生成的最终完成的文件添加后缀?

xtfmy6hx  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(501)

我用apache flink在hdfs上创建了一些归档数据文件,生成的文件名有part-{parallel task}-{count}这样的模式,但我期望的应该有“.gz”后缀,可以由apache spark直接加载。
我在ApacheFlink中找不到任何api来为bucketingsink生成的最终完成文件添加后缀,但只能为inprogress、pending和validlength状态添加后缀。有人能帮忙吗?hdfs连接器和javaapi

c7rzv4ha

c7rzv4ha1#

据我所知,没有使用默认bucketingsink添加后缀的选项。
一个选项是不使用检查点,并将挂起的后缀设置为所需的后缀。但由于检查点在大多数情况下都是可取的,所以这并不是最佳的。
我的解决方案是创建一个bucketingsinkwithsuffix实现,它几乎是默认bucketingsink的精确副本。唯一需要更改的是为后缀添加一个成员变量,后缀可以在构造函数中设置,并调整创建基路径的方式。
以下是我对构造函数的实现:

public BucketingSinkWithSuffix(String basePath, String suffix) {
    this.basePath = basePath;
    this.bucketer = new DateTimeBucketer<>();
    this.writerTemplate = new StringWriter<>();
    this.partSuffix = suffix;
}

以及用于生成基本路径(第523和528行):

partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter + partSuffix);

相关问题