spark读取.7z文件

u2nhd7ah  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(574)

我正在尝试使用scala或java读取spark.7z文件。我没有找到任何合适的方法或功能。
对于zip文件,我可以读取zipinputstream类接受的输入流,但是对于7z文件,sevenzfile类不接受任何输入流。https://commons.apache.org/proper/commons-compress/javadocs/api-1.16/org/apache/commons/compress/archivers/sevenz/sevenzfile.html
邮政编码

spark.sparkContext.binaryFiles("fileName").flatMap{case (name: String, content: PortableDataStream) =>
        val zis = new ZipInputStream(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}

我正在为7z文件尝试类似的代码

spark.sparkContext.binaryFiles(""filename"").flatMap{case (name: String, content: PortableDataStream) =>
        val zis = new SevenZFile(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}

但sevenzfile不接受这些格式。正在寻找创意。
如果文件位于本地文件系统中,则以下解决方案可行,但我的文件位于hdfs中
本地文件系统代码

public static void decompress(String in, File destination) throws IOException {
        SevenZFile sevenZFile = new SevenZFile(new File(in));
        SevenZArchiveEntry entry;
        while ((entry = sevenZFile.getNextEntry()) != null){
            if (entry.isDirectory()){
                continue;
            }
            File curfile = new File(destination, entry.getName());
            File parent = curfile.getParentFile();
            if (!parent.exists()) {
                parent.mkdirs();
            }
            FileOutputStream out = new FileOutputStream(curfile);
            byte[] content = new byte[(int) entry.getSize()];
            sevenZFile.read(content, 0, content.length);
            out.write(content);
            out.close();
        }
    }

经过这么多年的星火进化,应该有一个简单的方法来做到这一点。

lb3vh1jj

lb3vh1jj1#

而不是使用 java.io.File -你可以尝试 SeekableByteChannel 方法。
可以使用SeekableinMemoryByte通道读取字节数组。所以,只要你能从s3或者其他什么地方获取7zip文件并把它们作为字节数组传递出去,你就应该没事了。
尽管如此,spark确实不太适合处理zip和7zip文件。我可以告诉你,根据我的个人经验,一旦文件太大,spark的执行者无法处理,它就会严重失败。
类似apachenifi的东西可以更好地扩展和处理归档文件。fwiw,我目前正在处理一个大型数据转储,它让我经常处理50gbtarball,其中包含数百万个文件,nifi处理它们非常优雅。

相关问题