我有一个大约300 K行的dataFrame,我需要读取dataFrame,对于每一行,我需要执行以下操作:
for index, row in df.iterrows():
# do something to row['col A'] and row['col B']
# set value of row['col C'] based on result of the work done to row['col A'] and row['col B'], where row['col C'] is originally empty.
到目前为止,我所做的是通过执行以下操作将dataFrame拆分为多个子dataFrame:
df = pd.read_csv('file.csv')
df_split = np.array_split(df, 10)
def one_split(sub_df): # where sub_df is a unique sub-dataFrame of the original dataFrame for each thread/process to work on
for index, row in sub_df.iterrows():
# step 1: do something to row['col A'] and row['col B']
# step 2: set value of row['col C'] based on result of step 1
# step 3 (not implemented yet): copy values of 'col C' in sub_df to 'col C' in the original dataFrame, that is df in this case.
one_split(df_split[0])
我的目标是:
1.将工作拆分到10个线程/进程上以加快执行时间
1.每个线程/进程并行地执行X1 M0 N1 X。
1.在one_split()
中实现步骤3,每个线程/进程将在原始dataFrame的不同部分上工作,因此我“认为”争用条件不是这里要考虑的问题。
请问我怎样才能达到我的目标?
编辑此图像是所需行为的示例:
1条答案
按热度按时间2guxujil1#
这里有一个使用concurrent.futures的想法。使用你机器的cpu计数来分割df并将块馈送到ProcessPoolExecutor。main函数是你做反馈工作的地方,当所有块都被处理后,它将结果连接回一个大的df。