如何在多个线程/进程上拆分PandasDataFrame?

zbq4xfa0  于 2023-01-04  发布在  其他
关注(0)|答案(1)|浏览(229)

我有一个大约300 K行的dataFrame,我需要读取dataFrame,对于每一行,我需要执行以下操作:

for index, row in df.iterrows():
   # do something to row['col A'] and row['col B']
   # set value of row['col C'] based on result of the work done to row['col A'] and row['col B'], where row['col C'] is originally empty.

到目前为止,我所做的是通过执行以下操作将dataFrame拆分为多个子dataFrame:

df = pd.read_csv('file.csv')
df_split = np.array_split(df, 10)

def one_split(sub_df): # where sub_df is a unique sub-dataFrame of the original dataFrame for each thread/process to work on
   for index, row in sub_df.iterrows():
      # step 1: do something to row['col A'] and row['col B']
      # step 2: set value of row['col C'] based on result of step 1
      # step 3 (not implemented yet): copy values of 'col C' in sub_df to 'col C' in the original dataFrame, that is df in this case.

one_split(df_split[0])

我的目标是:
1.将工作拆分到10个线程/进程上以加快执行时间
1.每个线程/进程并行地执行X1 M0 N1 X。
1.在one_split()中实现步骤3,每个线程/进程将在原始dataFrame的不同部分上工作,因此我“认为”争用条件不是这里要考虑的问题。
请问我怎样才能达到我的目标?

编辑此图像是所需行为的示例:

2guxujil

2guxujil1#

这里有一个使用concurrent.futures的想法。使用你机器的cpu计数来分割df并将块馈送到ProcessPoolExecutor。main函数是你做反馈工作的地方,当所有块都被处理后,它将结果连接回一个大的df。

import concurrent.futures
import os

import numpy as np
import pandas as pd
import seaborn as sns

processors = os.cpu_count()
df = sns.load_dataset("diamonds")
batches = np.array_split(df, processors)

def process_data() -> None:
    with concurrent.futures.ProcessPoolExecutor(processors) as executer:
        result = pd.concat(executer.map(main, batches))

    print(result)

def main(batch: pd.DataFrame) -> pd.DataFrame:
    # Do stuff here
    return batch

if __name__ == "__main__":
    process_data()

相关问题