python 如何在异步协程中 Package 同步函数?

to94eoyn  于 2023-09-29  发布在  Python
关注(0)|答案(6)|浏览(143)

我正在使用aiohttp构建一个API服务器,它将TCP请求发送到一个单独的服务器。发送TCP请求的模块是同步的,对我来说是一个黑盒。所以我的问题是这些请求阻塞了整个API。我需要一种方法来将模块请求 Package 在一个异步协同程序中,而不会阻塞API的其余部分。
那么,仅使用sleep作为一个简单的例子,是否有任何方法可以以某种方式将耗时的同步代码 Package 在一个非阻塞的协程中,就像这样:

async def sleep_async(delay):
    # After calling sleep, loop should be released until sleep is done
    yield sleep(delay)
    return 'I slept asynchronously'
tcomlyy6

tcomlyy61#

最后,我在this thread中找到了答案。我正在寻找的方法是run_in_executor。这允许同步函数异步运行,而不会阻塞事件循环。
在我上面发布的sleep示例中,它可能看起来像这样:

import asyncio
from time import sleep

async def sleep_async(loop, delay):
    # None uses the default executor (ThreadPoolExecutor)
    await loop.run_in_executor(None, sleep, delay)
    return 'I slept asynchronously'

另请参阅以下答案-> How do we call a normal function where a coroutine is expected?

9njqaruj

9njqaruj2#

您可以使用装饰器将同步版本 Package 为异步版本。

import time
from functools import wraps, partial

def wrap(func):
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)
    return run

@wrap
def sleep_async(delay):
    time.sleep(delay)
    return 'I slept asynchronously'

过时,aloify为维护模式

或使用aioify

% pip install aioify

然后

@aioify
def sleep_async(delay):
    pass
i86rm4rw

i86rm4rw3#

从python 3.9开始,最干净的方法是使用asyncio.to_thread方法,这基本上是run_in_executor的快捷方式,但保留了所有的contextvars。
另外,请考虑GIL,因为它是一个to_线程。您仍然可以为numpy之类的东西运行CPU限制的任务。从文档中:

Note Due to the GIL, asyncio.to_thread() can typically only be used to make IO-bound functions non-blocking. However, for extension modules that release the GIL or alternative Python implementations that don’t have one, asyncio.to_thread() can also be used for CPU-bound functions.

同步功能的使用示例:

def blocking_io():
    time.sleep(1)

async def main():
    asyncio.to_thread(blocking_io)

asyncio.run(main())
hujrc8aj

hujrc8aj4#

也许有人需要我解决这个问题。我写了自己的库来解决这个问题,它允许你使用装饰器使任何函数异步。
要安装库,请运行以下命令:

$ pip install awaits

要使任何函数异步,只需添加@awaitable装饰器,如下所示:

import time
import asyncio
from awaits.awaitable import awaitable

@awaitable
def sum(a, b):
  # heavy load simulation
  time.sleep(10)
  return a + b

现在你可以确保你的函数是真正的异步协程:

print(asyncio.run(sum(2, 2)))

“在引擎盖下”,您的函数将在线程池中执行。此线程池不会在每次调用函数时重新创建。线程池创建一次,并通过队列接受新任务。这将使您的程序比使用其他解决方案运行得更快,因为创建额外的线程是一种额外的开销。

gfttwv5a

gfttwv5a5#

装饰器在这种情况下会很有用,它可以在另一个线程中运行阻塞函数。

import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial
from typing import Union

class to_async:

    def __init__(self, *, executor: Optional[ThreadPoolExecutor]=None):
       
        self.executor =  executor
    
    def __call__(self, blocking):
        @wraps(blocking)
        async def wrapper(*args, **kwargs):

            loop = asyncio.get_event_loop()
            if not self.executor:
                self.executor = ThreadPoolExecutor()

            func = partial(blocking, *args, **kwargs)
        
            return await loop.run_in_executor(self.executor,func)

        return wrapper

@to_async(executor=None)
def sync(*args, **kwargs):
    print(args, kwargs)
   
asyncio.run(sync("hello", "world", result=True))
slhcrj9b

slhcrj9b6#

不知道是否太晚,但你也可以使用装饰器在线程中完成你的功能。尽管如此,请注意,它仍然是非合作阻塞,不像异步是合作阻塞。

def wrap(func):
    from concurrent.futures import ThreadPoolExecutor
    pool=ThreadPoolExecutor()
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        future=pool.submit(func, *args, **kwargs)
        return asyncio.wrap_future(future)
    return run

相关问题