apache flink的zip压缩输入

f2uvfpb9  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(630)

我需要在apache flink中读取和处理zip存档中的特定文件。
在文件里,我发现
flink当前支持输入文件的透明解压缩,如果这些文件被标记为适当的文件扩展名。
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/batch/#read-压缩文件
在apache flink中进行动态解压缩时是否可能处理它?

rhfm7lfc

rhfm7lfc1#

这个 FileInputFormat 将读取压缩文件委托给 GZIPInputStream ,解压时将返回部分解压数据。

6ljaweal

6ljaweal2#

我想和大家分享我同时实现的解决方案。
所以,在创造了我自己的 InputFormat 我在 open() 方法:

@Override
public void open(final FileInputSplit ignored) throws IOException {
    ...
    final XMLInputFactory xmlif = XMLInputFactory.newInstance();
    final XMLStreamReader xmlr = xmlif.createXMLStreamReader(filePath.toString(),
              InputFormatUtil.readFileWithinZipArchive(filePath, nestedXmlFileName));
    while (xmlr.hasNext()) {
    ...
}

在哪里实施 readFileWithinZipArchive(...) 是:

public static InputStream readFileWithinZipArchive(final Path zipPath, final String filename) throws IOException {
    // using org.apache.flink.core.fs.Path for getting the InputStream from the (remote) zip archive
    final InputStream zipInputStream = zipPath.getFileSystem().open(zipPath);
    // generating a temporary local copy of the zip file
    final File tmpFile = stream2file(zipInputStream);
    // then using java.util.zip.ZipFile for extracting the InputStream for the specific file within the zip archive
    final ZipFile zipFile = new ZipFile(tmpFile);
    return zipFile.getInputStream(zipFile.getEntry(filename));
}

相关问题