我有一个场景,我需要从位于不同位置和不同模式的s3 bucket中读取许多文件(csv或parquet格式)。
我这样做的目的是从不同的s3位置提取所有元数据信息,并将其保存为Dataframe,然后在s3中将其保存为csv文件。这里的问题是,我有很多s3位置来读取文件(分区)。
s3://myRawbucket/source1/filename1/year/month/day/16/f1.parquet
s3://myRawbucket/source2/filename2/year/month/day/16/f2.parquet
s3://myRawbucket/source3/filename3/year/month/day/16/f3.parquet
s3://myRawbucket/source100/filename100/year/month/day/16/f100.parquet
s3://myRawbucket/source150/filename150/year/month/day/16/f150.parquet and .......... so on
我所要做的就是使用spark代码来读取这些文件(大约200个),如果需要的话,应用一些转换并提取头信息、计数信息、s3位置信息和数据类型。
什么是有效的方法来读取所有这些文件(差分模式),并使用spark代码(Dataframe)处理它,并将其保存为s3 bucket中的csv?请耐心等待,因为我是spark world的新手。我使用的是python(Pyspark)
1条答案
按热度按时间lp0sw83n1#
我认为你需要做的是使用一些Python/Pandas逻辑,并使用Spark并行化这些工作。Fugue非常适合。你可以将你的逻辑移植到Spark,只需要非常少的代码更改。让我们先考虑使用Python和Pandas定义逻辑,然后再将其带到Spark。
首先设置:
我们需要一个包含所有文件的小DataFrame来编排Spark的作业。
现在我们可以创建一个函数来保存每个文件的逻辑,注意,当我们把它带到Spark时,我们将为每个文件路径创建一个"作业",我们的函数一次只需要能够处理一个文件。
我们可以测试代码:
这就给了我们:
现在我们可以使用Fugue把它带到Spark,我们只需要
transform()
函数把逻辑带到Spark,模式是Spark的一个需求。输出将为: