我正在使用pandas编写一个parquet文件,使用to_parquet函数进行分区。范例:
df.to_parquet('gs://bucket/path', partition_cols=['key'])
问题是每次我运行代码时。它在分区中添加了一个新的parquet文件,当您读取数据时,您将获得每次运行脚本时的所有数据。本质上,数据每次都追加。有没有一种方法可以在每次使用pandas写入时覆盖数据?
4jb9z9bj1#
是的,有。你需要阅读pandas文档,你会看到to_parquet支持**kwargs并默认使用engine:pyarrow。就这样,你去了Pyarrow Docs。你会看到有两种方法可以做到这一点。第一,使用partition_filename_cb,它需要遗留支持,并且将被删除。第二,使用basename_template,这是一种新的方法。这是因为运行可调用的/lambda来命名每个分区的性能问题。你需要传递一个字符串:"string_{i}"。仅在旧版支持关闭的情况下工作。保存的文件将为“string_0”、“string_1”...你不能同时使用两者。
"string_{i}"
def write_data( df: pd.DataFrame, path: str, file_format="csv", comp_zip="snappy", index=False, partition_cols: list[str] = None, basename_template: str = None, storage_options: dict = None, **kwargs, ) -> None: getattr(pd.DataFrame, f"to_{file_format}")( df, f"{path}.{file_format}", compression=comp_zip, index=index, partition_cols=partition_cols, basename_template=basename_template, storage_options={"token": creds}, **kwargs, )
试试这个.
lf5gs5x22#
我发现dask对阅读和书写 parquet 很有帮助。它默认的文件名写(你可以改变),并将取代 parquet 文件,如果你使用相同的名称,我相信这是你正在寻找的。您可以通过将“append”设置为True来将数据附加到分区,这对我来说更直观,或者您可以将“overwrite”设置为True,这将在写入文件之前删除分区/文件夹中的所有文件。通过在读取时将分区列包括在 Dataframe 中,阅读parquet也能很好地工作。https://docs.dask.org/en/stable/generated/dask.dataframe.to_parquet.html请参阅下面的一些代码,我用来满足自己的行为dask.dataframe.to_parquet:
import pandas as pd from dask import dataframe as dd import numpy as np dates = pd.date_range("2015-01-01", "2022-06-30") df_len = len(dates) df_1 = pd.DataFrame(np.random.randint(0, 1000, size=(df_len, 1)), columns=["value"]) df_2 = pd.DataFrame(np.random.randint(0, 1000, size=(df_len, 1)), columns=["value"]) df_1["date"] = dates df_1["YEAR"] = df_1["date"].dt.year df_1["MONTH"] = df_1["date"].dt.month df_2["date"] = dates df_2["YEAR"] = df_2["date"].dt.year df_2["MONTH"] = df_2["date"].dt.month ddf_1 = dd.from_pandas(df_1, npartitions=1) ddf_2 = dd.from_pandas(df_2, npartitions=1) name_function = lambda x: f"monthly_data_{x}.parquet" ddf_1.to_parquet( "dask_test_folder", name_function=name_function, partition_on=["YEAR", "MONTH"], write_index=False, ) print(ddf_1.head()) ddf_first_write = dd.read_parquet("dask_test_folder/YEAR=2015/MONTH=1") print(ddf_first_write.head()) ddf_2.to_parquet( "dask_test_folder", name_function=name_function, partition_on=["YEAR", "MONTH"], write_index=False, ) print(ddf_2.head()) ddf_second_write = dd.read_parquet("dask_test_folder/YEAR=2015/MONTH=1") print(ddf_second_write.head())
kpbwa7wx3#
将existing_data_behavior与delete_matching一起使用。例如:
existing_data_behavior
delete_matching
df.to_parquet( 'gs://bucket/path', partition_cols=['key'], existing_data_behavior='delete_matching' )
pyarrow docs:“delete_matching”在编写分区数据集时很有用。第一次遇到每个分区目录时,整个目录将被删除。这允许您完全覆盖旧分区。
6kkfgxo04#
如果您使用旧版本,并且需要使用参数partition_filename_cb,则使用方法如下:
partition_filename_cb
df.to_parquet('gs://bucket/path', partition_cols=['key'], partition_filename_cb="constant_{}".format)
您需要传递format函数,以便它是一个可调用函数,返回给定键的文件名字符串。
4条答案
按热度按时间4jb9z9bj1#
是的,有。你需要阅读pandas文档,你会看到to_parquet支持**kwargs并默认使用engine:pyarrow。就这样,你去了Pyarrow Docs。你会看到有两种方法可以做到这一点。第一,使用partition_filename_cb,它需要遗留支持,并且将被删除。第二,使用basename_template,这是一种新的方法。这是因为运行可调用的/lambda来命名每个分区的性能问题。你需要传递一个字符串:
"string_{i}"
。仅在旧版支持关闭的情况下工作。保存的文件将为“string_0”、“string_1”...你不能同时使用两者。试试这个.
lf5gs5x22#
我发现dask对阅读和书写 parquet 很有帮助。它默认的文件名写(你可以改变),并将取代 parquet 文件,如果你使用相同的名称,我相信这是你正在寻找的。您可以通过将“append”设置为True来将数据附加到分区,这对我来说更直观,或者您可以将“overwrite”设置为True,这将在写入文件之前删除分区/文件夹中的所有文件。通过在读取时将分区列包括在 Dataframe 中,阅读parquet也能很好地工作。
https://docs.dask.org/en/stable/generated/dask.dataframe.to_parquet.html
请参阅下面的一些代码,我用来满足自己的行为dask.dataframe.to_parquet:
kpbwa7wx3#
将
existing_data_behavior
与delete_matching
一起使用。例如:pyarrow docs:
“delete_matching”在编写分区数据集时很有用。第一次遇到每个分区目录时,整个目录将被删除。这允许您完全覆盖旧分区。
6kkfgxo04#
如果您使用旧版本,并且需要使用参数
partition_filename_cb
,则使用方法如下:您需要传递format函数,以便它是一个可调用函数,返回给定键的文件名字符串。