有没有一种方法可以使用pandas to_parquet分区覆盖现有数据?

bvk5enib  于 2023-09-29  发布在  其他
关注(0)|答案(4)|浏览(168)

我正在使用pandas编写一个parquet文件,使用to_parquet函数进行分区。范例:

df.to_parquet('gs://bucket/path', partition_cols=['key'])

问题是每次我运行代码时。它在分区中添加了一个新的parquet文件,当您读取数据时,您将获得每次运行脚本时的所有数据。本质上,数据每次都追加。
有没有一种方法可以在每次使用pandas写入时覆盖数据?

4jb9z9bj

4jb9z9bj1#

是的,有。你需要阅读pandas文档,你会看到to_parquet支持**kwargs并默认使用engine:pyarrow。就这样,你去了Pyarrow Docs。你会看到有两种方法可以做到这一点。第一,使用partition_filename_cb,它需要遗留支持,并且将被删除。第二,使用basename_template,这是一种新的方法。这是因为运行可调用的/lambda来命名每个分区的性能问题。你需要传递一个字符串:"string_{i}"。仅在旧版支持关闭的情况下工作。保存的文件将为“string_0”、“string_1”...你不能同时使用两者。

def write_data(
    df: pd.DataFrame,
    path: str,
    file_format="csv",
    comp_zip="snappy",
    index=False,
    partition_cols: list[str] = None,
    basename_template: str = None,
    storage_options: dict = None,
    **kwargs,
) -> None:
    getattr(pd.DataFrame, f"to_{file_format}")(
        df,
        f"{path}.{file_format}",
        compression=comp_zip,
        index=index,
        partition_cols=partition_cols,
        basename_template=basename_template,
        storage_options={"token": creds},
        **kwargs,
    )

试试这个.

lf5gs5x2

lf5gs5x22#

我发现dask对阅读和书写 parquet 很有帮助。它默认的文件名写(你可以改变),并将取代 parquet 文件,如果你使用相同的名称,我相信这是你正在寻找的。您可以通过将“append”设置为True来将数据附加到分区,这对我来说更直观,或者您可以将“overwrite”设置为True,这将在写入文件之前删除分区/文件夹中的所有文件。通过在读取时将分区列包括在 Dataframe 中,阅读parquet也能很好地工作。
https://docs.dask.org/en/stable/generated/dask.dataframe.to_parquet.html
请参阅下面的一些代码,我用来满足自己的行为dask.dataframe.to_parquet:

import pandas as pd
from dask import dataframe as dd
import numpy as np

dates = pd.date_range("2015-01-01", "2022-06-30")
df_len = len(dates)
df_1 = pd.DataFrame(np.random.randint(0, 1000, size=(df_len, 1)), columns=["value"])
df_2 = pd.DataFrame(np.random.randint(0, 1000, size=(df_len, 1)), columns=["value"])

df_1["date"] = dates
df_1["YEAR"] = df_1["date"].dt.year
df_1["MONTH"] = df_1["date"].dt.month

df_2["date"] = dates
df_2["YEAR"] = df_2["date"].dt.year
df_2["MONTH"] = df_2["date"].dt.month

ddf_1 = dd.from_pandas(df_1, npartitions=1)
ddf_2 = dd.from_pandas(df_2, npartitions=1)

name_function = lambda x: f"monthly_data_{x}.parquet"

ddf_1.to_parquet(
    "dask_test_folder",
    name_function=name_function,
    partition_on=["YEAR", "MONTH"],
    write_index=False,
)

print(ddf_1.head())
ddf_first_write = dd.read_parquet("dask_test_folder/YEAR=2015/MONTH=1")
print(ddf_first_write.head())

ddf_2.to_parquet(
    "dask_test_folder",
    name_function=name_function,
    partition_on=["YEAR", "MONTH"],
    write_index=False,
)

print(ddf_2.head())
ddf_second_write = dd.read_parquet("dask_test_folder/YEAR=2015/MONTH=1")
print(ddf_second_write.head())
kpbwa7wx

kpbwa7wx3#

existing_data_behaviordelete_matching一起使用。例如:

df.to_parquet(
    'gs://bucket/path', 
    partition_cols=['key'], 
    existing_data_behavior='delete_matching'
)

pyarrow docs
“delete_matching”在编写分区数据集时很有用。第一次遇到每个分区目录时,整个目录将被删除。这允许您完全覆盖旧分区。

6kkfgxo0

6kkfgxo04#

如果您使用旧版本,并且需要使用参数partition_filename_cb,则使用方法如下:

df.to_parquet('gs://bucket/path', partition_cols=['key'], partition_filename_cb="constant_{}".format)

您需要传递format函数,以便它是一个可调用函数,返回给定键的文件名字符串。

相关问题