@Override
public void open(final FileInputSplit ignored) throws IOException {
...
final XMLInputFactory xmlif = XMLInputFactory.newInstance();
final XMLStreamReader xmlr = xmlif.createXMLStreamReader(filePath.toString(),
InputFormatUtil.readFileWithinZipArchive(filePath, nestedXmlFileName));
while (xmlr.hasNext()) {
...
}
在哪里实施 readFileWithinZipArchive(...) 是:
public static InputStream readFileWithinZipArchive(final Path zipPath, final String filename) throws IOException {
// using org.apache.flink.core.fs.Path for getting the InputStream from the (remote) zip archive
final InputStream zipInputStream = zipPath.getFileSystem().open(zipPath);
// generating a temporary local copy of the zip file
final File tmpFile = stream2file(zipInputStream);
// then using java.util.zip.ZipFile for extracting the InputStream for the specific file within the zip archive
final ZipFile zipFile = new ZipFile(tmpFile);
return zipFile.getInputStream(zipFile.getEntry(filename));
}
2条答案
按热度按时间rhfm7lfc1#
这个
FileInputFormat
将读取压缩文件委托给GZIPInputStream
,解压时将返回部分解压数据。6ljaweal2#
我想和大家分享我同时实现的解决方案。
所以,在创造了我自己的
InputFormat
我在open()
方法:在哪里实施
readFileWithinZipArchive(...)
是: