Asyncio Pandas与Inplace

lhcgjxsq  于 2023-05-05  发布在  其他
关注(0)|答案(1)|浏览(315)

我只是read this introduction,但在实现这两个示例时遇到了麻烦(注解代码是第二个示例):

import asyncio
import pandas as pd
from openpyxl import load_workbook

async def loop_dfs(dfs):
    async def clean_df(df):
        df.drop(["column_1"], axis=1, inplace=True)
        ... a bunch of other inplace=True functions ...
        return "Done"

    # tasks = [clean_df(df) for (table, dfs) in dfs.items()]
    # await asyncio.gather(*tasks)

    tasks = [clean_df(df) for (table, df) in dfs.items()]
    completed, pending = await asyncio.wait(tasks)

def main():
    dfs = {
        sn: pd.read_excel("excel.xlsx", sheet_name=sn)
        for sn in load_workbook("excel.xlsx").sheetnames
    }

    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(loop_dfs(dfs))

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(loop_dfs(dfs))
    finally:
        loop.close()

main()

我看到了其他一些关于pandas不支持asyncio的帖子,也许我只是错过了一个更大的画面,但如果我正在做原地操作,那就不重要了,对吗?I saw recommendations for Dask但没有立即支持阅读excel,我想我会先尝试这个,但我不断得到
RuntimeError: Event loop already running

pepwfjgg

pepwfjgg1#

我看到了其他一些关于pandas不支持asyncio的帖子,也许我只是错过了一个更大的画面,但如果我正在做原地操作,那就不重要了,对吗?
就地操作是那些modify existing data。这是一个效率问题,而你的目标似乎是并行化,这是一个完全不同的问题。
Pandas不支持asyncio,不仅因为它还没有实现,而且因为Pandas通常不支持asyncio支持的操作:网络和子进程IO。Pandas函数要么使用CPU,要么等待磁盘访问,这两种方式都不适合asyncio。Asyncio允许用协同程序来表达网络通信,协同程序看起来就像普通的同步代码。在协同程序中,每个阻塞操作(例如网络读取)是await艾德,如果数据还不可用,则自动挂起整个任务。在每一个这样的暂停系统切换到下一个任务,有效地创建一个合作的多任务系统。
当试图调用不支持asyncio的库时,比如pandas,表面上看起来可以工作,但你不会得到任何好处,代码将串行运行。例如:

async def loop_dfs(dfs):
    async def clean_df(df):
        ...    
    tasks = [clean_df(df) for (table, df) in dfs.items()]
    completed, pending = await asyncio.wait(tasks)

由于clean_df不包含await的单个示例,因此它只是一个名义上的协程-它实际上永远不会暂停其执行以允许其他协程运行。因此,await asyncio.wait(tasks)将连续运行任务,就像你写的那样:

for table, df in dfs.items():
    clean_df(df)

为了让事情并行运行(假设pandas偶尔会在其操作期间释放GIL),您应该将单个CPU绑定函数交给线程池:

async def loop_dfs(dfs):
    def clean_df(df):  # note: ordinary def
        ...
    loop = asyncio.get_event_loop()
    tasks = [loop.run_in_executor(clean_df, df)
             for (table, df) in dfs.items()]
    completed, pending = await asyncio.wait(tasks)

如果你走这条路,你首先不需要asyncio,你可以简单地使用concurrent.futures。例如:

def loop_dfs(dfs):  # note: ordinary def
    def clean_df(df):
        ...
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(clean_df, df)
                   for (table, df) in dfs.items()]
        concurrent.futures.wait(futures)

我想我应该先试试这个,但我一直得到RuntimeError: Event loop already running
这个错误通常意味着你已经在一个已经运行asyncio的环境中启动了脚本,比如jupyter notebook。如果是这种情况,请确保使用标准python运行脚本,或者参考notebook的文档,了解如何更改代码以将协程提交到已经运行的事件循环。

相关问题