python 用于库Jupyter笔记本安全异步运行 Package 方法

von4xj4u  于 2022-12-10  发布在  Python
关注(0)|答案(1)|浏览(154)

我正在构建一个在内部利用asyncio的库,虽然用户不应该意识到这一点,但内部实现目前用asyncio.run()陶瓷 Package 器 Package 了异步代码。
然而,有些用户将从jupyter笔记本电脑上执行这个库代码,我正在努力用一个对两种环境都安全的 Package 器来替换asyncio.run()
以下是我尝试过的方法:

ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'

def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False)
    try:
        loop = asyncio.get_running_loop()
        task = loop.create_task(async_coroutine)
        result = loop.run_until_complete(task) # <- fails as loop is already running
        # OR
        asyncio.wait_for(task, timeout=None, loop=loop) # <- fails as this is an async method
        result = task.result()
    except RuntimeError as e:
        if _test_mode:
            raise e
        if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
            return asyncio.run(async_coroutine)
    except Exception as e:
        raise e

要求

1.我们使用的是python 3.8,所以不能使用asyncio。
1.我们不能使用线程,因此建议的here解决方案无法工作

问题:

我如何等待async_coroutineloop.create_task(async_coroutine)提供的任务/未来完成?
上面的方法都没有真正执行等待,原因在注解中说明。

更新

我找到了这个nest_asyncio库,它正是为解决这个问题而构建的:

ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'

HAS_BEEN_RUN = False

def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False):
    global HAS_BEEN_RUN
    if not HAS_BEEN_RUN:
        _apply_nested_asyncio_patch()
        HAS_BEEN_RUN = True
    return asyncio.run(async_coroutine)

def _apply_nested_asyncio_patch():
    try:
        loop = asyncio.get_running_loop()
        logger.info(f'as get_running_loop() returned {loop}, this environment has it`s own event loop.\n'
                    f'Patching with nest_asyncio')
        import nest_asyncio
        nest_asyncio.apply()
    except RuntimeError as e:
        if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
            logger.info(f'as get_running_loop() raised {e}, this environment does not have it`s own event loop.\n'
                        f'No patching necessary')
        else:
            raise e

不过,我仍然面临着一些问题:
1.根据this SO answer,可能存在资源不足问题
1.在async_coroutine中写入的任何日志都不会打印在jupyter笔记本中

  1. jupyter notebook内核在完成任务时偶尔会崩溃

编辑

对于上下文,该库在内部调用外部API,以便对用户提供的 Dataframe 进行数据丰富:

# user code using the library
import my_lib

df = pd.DataFrame(data='some data')
enriched_df = my_lib.enrich(df)
wn9m85ua

wn9m85ua1#

我仍然不知道你的目标是什么,但我会用代码描述我试图在我的评论中解释的东西,这样你就可以告诉我你的问题在哪里。
如果您的包请求用户调用一些将协程作为参数的函数(示例中的your_package_function),那么您不应该担心事件循环。
这意味着包不应该调用asyncio.runloop.run_until_complete。客户端应该(在几乎所有情况下)负责启动even循环。
你的包代码应该假设有一个事件循环在运行。因为我不知道你的包的目标,我只是做了一个函数,它将一个"test"参数提供给客户端正在传递的任何协程:

import asyncio

async def your_package_function(coroutine):
    print("- Package internals start")
    task = asyncio.create_task(coroutine("test"))
    await asyncio.sleep(.5) # Simulates slow tasks within your package
    print("- Package internals completed other task")
    x = await task
    print("- Package internals end")
    return x

然后,客户端(软件包用户)应调用以下内容:

async def main():
    x = await your_package_function(return_with_delay)
    print(f"Computed value = {x}")

async def return_with_delay(value):
    print("+ User function start")
    await asyncio.sleep(.2)
    print("+ User function end")
    return value

await main()
# or asyncio.run(main()) if needed

这将打印:

- Package internals start
- Package internals completed other task
+ User function start
+ User function end
- Package internals end
Computed value = test

相关问题