python Spark中从不同aws S3并行阅读多个文件

e4yzc0pl  于 2023-01-24  发布在  Python
关注(0)|答案(1)|浏览(158)

我有一个场景,我需要从位于不同位置和不同模式的s3 bucket中读取许多文件(csv或parquet格式)。
我这样做的目的是从不同的s3位置提取所有元数据信息,并将其保存为Dataframe,然后在s3中将其保存为csv文件。这里的问题是,我有很多s3位置来读取文件(分区)。

s3://myRawbucket/source1/filename1/year/month/day/16/f1.parquet
s3://myRawbucket/source2/filename2/year/month/day/16/f2.parquet
s3://myRawbucket/source3/filename3/year/month/day/16/f3.parquet
s3://myRawbucket/source100/filename100/year/month/day/16/f100.parquet
s3://myRawbucket/source150/filename150/year/month/day/16/f150.parquet    and .......... so on

我所要做的就是使用spark代码来读取这些文件(大约200个),如果需要的话,应用一些转换并提取头信息、计数信息、s3位置信息和数据类型。
什么是有效的方法来读取所有这些文件(差分模式),并使用spark代码(Dataframe)处理它,并将其保存为s3 bucket中的csv?请耐心等待,因为我是spark world的新手。我使用的是python(Pyspark)

lp0sw83n

lp0sw83n1#

我认为你需要做的是使用一些Python/Pandas逻辑,并使用Spark并行化这些工作。Fugue非常适合。你可以将你的逻辑移植到Spark,只需要非常少的代码更改。让我们先考虑使用Python和Pandas定义逻辑,然后再将其带到Spark。
首先设置:

import pandas as pd

df = pd.DataFrame({"x": [1,2,3]})
df.to_parquet("/tmp/1.parquet")
df.to_parquet("/tmp/2.parquet")
df.to_parquet("/tmp/3.parquet")

我们需要一个包含所有文件的小DataFrame来编排Spark的作业。

file_paths = pd.DataFrame({"path": ["/tmp/1.parquet",
                                    "/tmp/2.parquet",
                                    "/tmp/3.parquet"]})

现在我们可以创建一个函数来保存每个文件的逻辑,注意,当我们把它带到Spark时,我们将为每个文件路径创建一个"作业",我们的函数一次只需要能够处理一个文件。

def process(df:pd.DataFrame) -> pd.DataFrame:
    path = df.iloc[0]['path']
    
    tmp = pd.read_parquet(path)
    
    # transformation
    tmp['y'] = tmp['x'] + 1
    
    # save
    tmp.to_parquet(path)
    
    # summary stats
    return pd.DataFrame({"path": [path],
                         'count': [tmp.shape[0]]})

我们可以测试代码:

process(file_paths)

这就给了我们:

path    count
/tmp/1.parquet  3

现在我们可以使用Fugue把它带到Spark,我们只需要transform()函数把逻辑带到Spark,模式是Spark的一个需求。

import fugue.api as fa
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

out = fa.transform(file_paths, process, schema="path:str,count:int", engine=spark)

# out is a Spark DataFrame
out.show()

输出将为:

+--------------+-----+
|          path|count|
+--------------+-----+
|/tmp/1.parquet|    3|
|/tmp/2.parquet|    3|
|/tmp/3.parquet|    3|
+--------------+-----+

相关问题