我正在尝试使用scala或java读取spark.7z文件。我没有找到任何合适的方法或功能。
对于zip文件,我可以读取zipinputstream类接受的输入流,但是对于7z文件,sevenzfile类不接受任何输入流。https://commons.apache.org/proper/commons-compress/javadocs/api-1.16/org/apache/commons/compress/archivers/sevenz/sevenzfile.html
邮政编码
spark.sparkContext.binaryFiles("fileName").flatMap{case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}}
我正在为7z文件尝试类似的代码
spark.sparkContext.binaryFiles(""filename"").flatMap{case (name: String, content: PortableDataStream) =>
val zis = new SevenZFile(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}}
但sevenzfile不接受这些格式。正在寻找创意。
如果文件位于本地文件系统中,则以下解决方案可行,但我的文件位于hdfs中
本地文件系统代码
public static void decompress(String in, File destination) throws IOException {
SevenZFile sevenZFile = new SevenZFile(new File(in));
SevenZArchiveEntry entry;
while ((entry = sevenZFile.getNextEntry()) != null){
if (entry.isDirectory()){
continue;
}
File curfile = new File(destination, entry.getName());
File parent = curfile.getParentFile();
if (!parent.exists()) {
parent.mkdirs();
}
FileOutputStream out = new FileOutputStream(curfile);
byte[] content = new byte[(int) entry.getSize()];
sevenZFile.read(content, 0, content.length);
out.write(content);
out.close();
}
}
经过这么多年的星火进化,应该有一个简单的方法来做到这一点。
1条答案
按热度按时间lb3vh1jj1#
而不是使用
java.io.File
-你可以尝试SeekableByteChannel
方法。可以使用SeekableinMemoryByte通道读取字节数组。所以,只要你能从s3或者其他什么地方获取7zip文件并把它们作为字节数组传递出去,你就应该没事了。
尽管如此,spark确实不太适合处理zip和7zip文件。我可以告诉你,根据我的个人经验,一旦文件太大,spark的执行者无法处理,它就会严重失败。
类似apachenifi的东西可以更好地扩展和处理归档文件。fwiw,我目前正在处理一个大型数据转储,它让我经常处理50gbtarball,其中包含数百万个文件,nifi处理它们非常优雅。