pandas 跨任务 Dataframe 的分区并行化方法

0sgqnhkj  于 2023-03-21  发布在  其他
关注(0)|答案(1)|浏览(116)

我正在进入dask,并希望使用dask.distribution库来并行计算。考虑以下示例 Dataframe :

import pandas as pd
import numpy as np
import dask.dataframe as dd

N = 100000000
# init pandas data frame
df = pd.DataFrame(data={
    "x1": np.random.choice([0, 1, np.nan], size=N, p=[.45, .3, .25]),
    "x2": np.random.choice([0, 1, np.nan], size=N, p=[.2, .4, .4]),
    "x3": np.random.choice([0, 1], size=N, p=[.1, .9]),
})
# init dask data frame 
ddf = dd.from_pandas(df, npartitions=6)

我想并行化一个简单的外部函数,比如:

def get_coalesce(ddf: dd.DataFrame, x1: str, x2: str, x3: str) -> dd.DataFrame:
    # Checks whether function is called
    print(1)
    # Assigns new columns whole data frame
    ddf["x4"] = ddf[x1].combine_first(ddf[x2]).combine_first(ddf[x3])
    return ddf

而不需要真正改变它的源代码。有解决方法吗?
我想使用集群,但这似乎是错误的方法:

from dask.distributed import LocalCluster, Client

with LocalCluster(
    n_workers=16,
    processes=True,
    threads_per_worker=2,
    memory_limit="10GB",
) as cluster, Client(cluster) as client:
    df = get_coalesce(ddf, **dict(zip(ddf, ddf))).compute()
bzzcjhmw

bzzcjhmw1#

首先,使用分布式集群或Dask本地调度程序与您的问题是正交的。您的最终代码应该可以使用或不使用LocalCluster
Dask没有实现combine_first方法,因为它太复杂了,无法以分布式方式实现。但是,由于您实际上只使用一个DataFrame,因此您应该能够使您的代码与map_partitions一起工作。
比如:

def get_coalesce(df: pd.DataFrame, x1: str, x2: str, x3: str) -> pd.DataFrame:
    # Assigns new columns whole data frame
    df["x4"] = df[x1].combine_first(df[x2]).combine_first(df[x3])
    return df

final_df = ddf.map_partitions(get_coalesce, "x1", "x2", "x3").compute()

相关问题