为什么在pyspark中收集“二进制文件”时它们是空的?

mlmc2os5  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(422)

我在同一个文件夹中的hdfs上有两个zip文件: /user/path-to-folder-with-zips/ .
我将其传递给pyspark中的“二进制文件”:

zips = sc.binaryFiles('/user/path-to-folder-with-zips/')

我正在尝试解压zip文件并对其中的文本文件执行操作,所以我尝试在处理rdd时查看内容。我是这样做的:

zips_collected = zips.collect()

但是,当我这么做的时候,它给出了一个空列表:

>> zips_collected
[]

我知道拉链不是空的-它们有文本文件。文件上说
每个文件作为单个记录读取,并以键-值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。
我做错什么了?我知道我不能查看文件的内容,因为它是压缩的,因此是二进制的。但是,我至少应该能看到一些东西。为什么它什么都不退?
每个zip文件可以有多个文件,但内容总是这样:

rownum|data|data|data|data|data
rownum|data|data|data|data|data
rownum|data|data|data|data|data
yi0zb3m4

yi0zb3m41#

我假设每个zip文件都包含一个文本文件(多个文本文件的代码很容易更改)。您需要首先通过读取zip文件的内容 io.BytesIO 在逐行处理之前。解决方案松散地基于https://stackoverflow.com/a/36511190/234233.

import io
import gzip

def zip_extract(x):
    """Extract *.gz file in memory for Spark"""
    file_obj = gzip.GzipFile(fileobj=io.BytesIO(x[1]), mode="r")
    return file_obj.read()

zip_data = sc.binaryFiles('/user/path-to-folder-with-zips/*.zip')
results = zip_data.map(zip_extract) \
                  .flatMap(lambda zip_file: zip_file.split("\n")) \
                  .map(lambda line: parse_line(line))
                  .collect()

相关问题