我正在进入dask
,并希望使用dask.distribution
库来并行计算。考虑以下示例 Dataframe :
import pandas as pd
import numpy as np
import dask.dataframe as dd
N = 100000000
# init pandas data frame
df = pd.DataFrame(data={
"x1": np.random.choice([0, 1, np.nan], size=N, p=[.45, .3, .25]),
"x2": np.random.choice([0, 1, np.nan], size=N, p=[.2, .4, .4]),
"x3": np.random.choice([0, 1], size=N, p=[.1, .9]),
})
# init dask data frame
ddf = dd.from_pandas(df, npartitions=6)
我想并行化一个简单的外部函数,比如:
def get_coalesce(ddf: dd.DataFrame, x1: str, x2: str, x3: str) -> dd.DataFrame:
# Checks whether function is called
print(1)
# Assigns new columns whole data frame
ddf["x4"] = ddf[x1].combine_first(ddf[x2]).combine_first(ddf[x3])
return ddf
而不需要真正改变它的源代码。有解决方法吗?
我想使用集群,但这似乎是错误的方法:
from dask.distributed import LocalCluster, Client
with LocalCluster(
n_workers=16,
processes=True,
threads_per_worker=2,
memory_limit="10GB",
) as cluster, Client(cluster) as client:
df = get_coalesce(ddf, **dict(zip(ddf, ddf))).compute()
1条答案
按热度按时间bzzcjhmw1#
首先,使用分布式集群或Dask本地调度程序与您的问题是正交的。您的最终代码应该可以使用或不使用
LocalCluster
。Dask没有实现
combine_first
方法,因为它太复杂了,无法以分布式方式实现。但是,由于您实际上只使用一个DataFrame,因此您应该能够使您的代码与map_partitions
一起工作。比如: