使用pyspark在palantir foundry目录中联合60个 Dataframe

xxls0lw8  于 2023-03-11  发布在  Spark
关注(0)|答案(1)|浏览(183)

我有一个60+铸造数据集在它的目录。我只是要读取所有的数据集和联合成一个单一的 Dataframe
路径=输入(“我的消费新文件P2P/2020/”)

Dataset1
Dataset2
Dataset3
...
Dataset60

输出(“MySpend新文件P2P/2020/UnionAll”)

xxe27gdn

xxe27gdn1#

更专业的是:

  • 您可以在转换中利用union_many(更详细的example here)并手动列出其输入。请注意,您可以使用数据沿袭快速复制粘贴所有数据集(select datasets > top right > "view histogram" icon > "copy paths")的路径。
from transforms.api import transform_df, Input, Output
from transforms.verbs import dataframes as D

@transform_df(
    Output("/path/to/dataset/unioned"),
    source_df_1=Input("/path/to/dataset/one"),
    source_df_2=Input("/path/to/dataset/two"),
    source_df_3=Input("/path/to/dataset/three"),
)
def compute(source_df_1, source_df_2, source_df_3):
    return D.union_many(
        source_df_1,
        source_df_2,
        source_df_3,
    )
  • 同样的方式,但更容易复制粘贴,您可以参数化您的转换使用一个数组的路径作为输入
from transforms.verbs import dataframes as D
from transforms.api import transform_df, Input, Output

# Configure the number of datasets to generate
list_datasets_paths = [
    "/path/to/dataset/one",
    "/path/to/dataset/two",
    "/path/to/dataset/three"]

# Convert the list of paths in a dict of Input()
input_dict = {}
for dataset_path in list_datasets_paths:
    input_dict[dataset_path.split("/")[-1]] = Input(dataset_path)

# Provide the dict of Input() to the transform
@transform_df(
    Output("/path/to/dataset/unioned"),
    **input_dict
)
def compute_2(**inputs_dataframes):
    # Create a list of dataframes from the input dict
    dataframes_list = inputs_dataframes.values()
    # Union the list of dataframes
    return D.union_many(*dataframes_list)
  • 如果数据集随着时间的推移而变化,你可以使用Logic Flows,它将列出给定输入文件夹中资源的rid(资源标识符),并打开一个新的拉取请求,该请求包含一个包含这些rid/路径的文件。

注意:您还可以使用其他工具来构建管道,从而构建联合数据集,如Pipeline Builder/docs

相关问题