我正在构建一个在内部利用asyncio的库,虽然用户不应该意识到这一点,但内部实现目前用asyncio.run()陶瓷 Package 器 Package 了异步代码。
然而,有些用户将从jupyter笔记本电脑上执行这个库代码,我正在努力用一个对两种环境都安全的 Package 器来替换asyncio.run()
。
以下是我尝试过的方法:
ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'
def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False)
try:
loop = asyncio.get_running_loop()
task = loop.create_task(async_coroutine)
result = loop.run_until_complete(task) # <- fails as loop is already running
# OR
asyncio.wait_for(task, timeout=None, loop=loop) # <- fails as this is an async method
result = task.result()
except RuntimeError as e:
if _test_mode:
raise e
if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
return asyncio.run(async_coroutine)
except Exception as e:
raise e
要求
1.我们使用的是python 3.8,所以不能使用asyncio。
1.我们不能使用线程,因此建议的here解决方案无法工作
问题:
我如何等待async_coroutine
或loop.create_task(async_coroutine)
提供的任务/未来完成?
上面的方法都没有真正执行等待,原因在注解中说明。
更新
我找到了这个nest_asyncio库,它正是为解决这个问题而构建的:
ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'
HAS_BEEN_RUN = False
def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False):
global HAS_BEEN_RUN
if not HAS_BEEN_RUN:
_apply_nested_asyncio_patch()
HAS_BEEN_RUN = True
return asyncio.run(async_coroutine)
def _apply_nested_asyncio_patch():
try:
loop = asyncio.get_running_loop()
logger.info(f'as get_running_loop() returned {loop}, this environment has it`s own event loop.\n'
f'Patching with nest_asyncio')
import nest_asyncio
nest_asyncio.apply()
except RuntimeError as e:
if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
logger.info(f'as get_running_loop() raised {e}, this environment does not have it`s own event loop.\n'
f'No patching necessary')
else:
raise e
不过,我仍然面临着一些问题:
1.根据this SO answer,可能存在资源不足问题
1.在async_coroutine中写入的任何日志都不会打印在jupyter笔记本中
- jupyter notebook内核在完成任务时偶尔会崩溃
编辑
对于上下文,该库在内部调用外部API,以便对用户提供的 Dataframe 进行数据丰富:
# user code using the library
import my_lib
df = pd.DataFrame(data='some data')
enriched_df = my_lib.enrich(df)
1条答案
按热度按时间wn9m85ua1#
我仍然不知道你的目标是什么,但我会用代码描述我试图在我的评论中解释的东西,这样你就可以告诉我你的问题在哪里。
如果您的包请求用户调用一些将协程作为参数的函数(示例中的
your_package_function
),那么您不应该担心事件循环。这意味着包不应该调用
asyncio.run
或loop.run_until_complete
。客户端应该(在几乎所有情况下)负责启动even循环。你的包代码应该假设有一个事件循环在运行。因为我不知道你的包的目标,我只是做了一个函数,它将一个
"test"
参数提供给客户端正在传递的任何协程:然后,客户端(软件包用户)应调用以下内容:
这将打印: