如何在spark中处理后发布单个Parquet文件

ia2d9nvy  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(287)

我有一个有10万张唱片的Parquet档案。我希望以并行方式处理所有记录,并且我的处理将生成更多的列。因此,如果我的Parquet文件有3列和100000条记录,我的文件如下所示-

col A    col B     colC
aa         bb       cc
aa1        bb1      cc1

在得到并行处理后,我想要一个具有相同列和3个以上列的新Parquet文件。我的输出是这样的-

col A    col B     colC     colD      colE       colF
aa         bb       cc       dd        ee          ff
aa1        bb1      cc1      dd1       ee1         ff1

我想知道-
在spark节点中并行运行之后,如何将所有结果合并到一个parquet文件中?
如何向现有文件中添加更多列?
任何帮助都将不胜感激。

5q4ezhmt

5q4ezhmt1#

在这种情况下,我建议您不要从apachespark创建一个文件,这将破坏该工具的性能。如果您在hdfs中进行存储,我建议您在完成mapreduce后运行以下命令:

hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt

或者,您可以使用一个函数来合并这些文件,如下所示:

textFiles = [f for f in listdir(outputPath) if isfile(join(outputPath, f))]

files = [f for f in textFiles if f.startswith('part-')]
files.sort()

filePath = outputPath + '/consolidate_file'
outputFile = open(filePath, 'w')

for i in files:
    tmpFile = open(outputPath + '/' + i, 'r')
    outputFile.write(tmpFile.read())

outputFile.close()
bis0qfac

bis0qfac2#

您可以随时使用

df.repartition(1)

如果您只是在试验,或者使用小数据集(长度合理的100000条记录可以被视为小数据),它就可以完成这项工作。但是,如果您希望应用程序能够优雅地扩展,那么应该避免这种激进的重新分区并编写几个Parquet文件,有几个输出文件有什么不好的呢?
关于第二个问题,parquet文件被设计为不可变的,因此最直接的方法是读取数据,添加相关列并将数据保存回parquet(覆盖现有文件,或将修改后的数据保存到新目录)。

相关问题