如何限制Python asyncio的并发性?

1bqhqjot  于 2023-02-26  发布在  Python
关注(0)|答案(8)|浏览(119)

假设我们有一堆链接要下载,每个链接下载所需的时间不同,我只能使用最多3个连接进行下载,现在,我想确保使用asyncio可以高效地完成这一任务。
这是我想要达到的目标:在任何时间点,尽量确保我有至少3个下载运行。

Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----

数字代表下载链接,而连字符代表等待下载。
这是我现在使用的代码

from random import randint
import asyncio

count = 0

async def download(code, permit_download, no_concurrent, downloading_event):
    global count
    downloading_event.set()
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))
    count -= 1
    if count < no_concurrent and not permit_download.is_set():
        permit_download.set()

async def main(loop):
    global count
    permit_download = asyncio.Event()
    permit_download.set()
    downloading_event = asyncio.Event()
    no_concurrent = 3
    i = 0
    while i < 9:
        if permit_download.is_set():
            count += 1
            if count >= no_concurrent:
                permit_download.clear()
            loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
            await downloading_event.wait()  # To force context to switch to download function
            downloading_event.clear()
            i += 1
        else:
            await permit_download.wait()
    await asyncio.sleep(9)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

并且输出如预期:

downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8

但我的问题是:
1.目前,我只是等待9秒钟来保持main函数运行,直到下载完成。有没有一种有效的方法可以在退出main函数之前等待最后一次下载完成?(我知道有asyncio.wait,但我需要存储所有的任务引用才能使其工作)
1.有什么好的库可以完成这种任务呢?我知道javascript有很多异步库,但是Python呢?
编辑:2.什么样的库可以处理常见的异步模式?(类似于async

a2mppw5e

a2mppw5e1#

如果我没记错的话,您正在搜索asyncio.Semaphore。用法示例:

import asyncio
from random import randint

async def download(code):
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))

sem = asyncio.Semaphore(3)

async def safe_download(i):
    async with sem:  # semaphore limits num of simultaneous downloads
        return await download(i)

async def main():
    tasks = [
        asyncio.ensure_future(safe_download(i))  # creating task starts coroutine
        for i
        in range(9)
    ]
    await asyncio.gather(*tasks)  # await moment all downloads done

if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

输出:

downloading 0 will take 3 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 3 second(s)
downloaded 1
downloaded 0
downloading 4 will take 2 second(s)
downloading 5 will take 1 second(s)
downloaded 5
downloaded 3
downloading 6 will take 3 second(s)
downloading 7 will take 1 second(s)
downloaded 4
downloading 8 will take 2 second(s)
downloaded 7
downloaded 8
downloaded 6

aiohttp异步下载的一个例子是here。注意aiohttp有一个内置的信号量等价物,你可以看到一个here的例子。它有一个默认的100个连接的限制。

pcrecxhr

pcrecxhr2#

我用了Mikhail Gerasimov的答案,最后得到了这个小宝石

async def gather_with_concurrency(n, *coros):
    semaphore = asyncio.Semaphore(n)

    async def sem_coro(coro):
        async with semaphore:
            return await coro
    return await asyncio.gather(*(sem_coro(c) for c in coros))

您将运行它而不是正常的收集

await gather_with_concurrency(100, *my_coroutines)
8oomwypt

8oomwypt3#

在阅读本答案的其余部分之前,请注意,限制异步并行任务数量的惯用方法是使用asyncio.Semaphore,如Mikhail's answer所示,并在Andrei's answer中进行了优雅的抽象。本答案包含实现相同目的的有效方法,但有点复杂。我不回答这个问题,因为在某些情况下,这种方法比信号量更有优势。特别是当要做的工作非常大或者是无限的,并且你不能提前创建所有的协程时,在这种情况下,第二个(基于队列的)解决方案是这个答案是你想要的,但是在大多数常规的情况下,比如通过aiohttp并行下载,你应该使用信号量来代替。
基本上,您需要一个固定大小的下载任务池。asyncio没有预先创建任务池,但创建一个任务池很容易:简单地保留一组任务,不要让它超出限制。虽然问题表明您不愿意走这条路,但代码最终要优雅得多:

import asyncio, random

async def download(code):
    wait_time = random.randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))

async def main(loop):
    no_concurrent = 3
    dltasks = set()
    i = 0
    while i < 9:
        if len(dltasks) >= no_concurrent:
            # Wait for some download to finish before adding a new one
            _done, dltasks = await asyncio.wait(
                dltasks, return_when=asyncio.FIRST_COMPLETED)
        dltasks.add(loop.create_task(download(i)))
        i += 1
    # Wait for the remaining downloads to finish
    await asyncio.wait(dltasks)

另一种方法是创建一个固定数量的协同程序来执行下载,就像一个固定大小的线程池,然后使用asyncio.Queue来为它们提供工作,这样就不需要手动限制下载的数量,下载的数量会自动受到调用download()的协同程序的数量的限制:

# download() defined as above

async def download_worker(q):
    while True:
        code = await q.get()
        await download(code)
        q.task_done()

async def main(loop):
    q = asyncio.Queue()
    workers = [loop.create_task(download_worker(q)) for _ in range(3)]
    i = 0
    while i < 9:
        await q.put(i)
        i += 1
    await q.join()  # wait for all tasks to be processed
    for worker in workers:
        worker.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

至于您的另一个问题,显而易见的选择是aiohttp

0tdrvxhp

0tdrvxhp4#

异步池库完全可以满足您的需要。
https://pypi.org/project/asyncio-pool/

LIST_OF_URLS = ("http://www.google.com", "......")

pool = AioPool(size=3)
await pool.map(your_download_coroutine, LIST_OF_URLS)
h7appiyu

h7appiyu5#

如果你有一个生成任务的生成器,那么可能会有更多的任务,而你的内存无法同时容纳。
经典的asyncio.Semaphore上下文管理器模式将所有任务同时争用内存。
我不喜欢asyncio.Queue模式。你可以阻止它将所有任务预加载到内存中(通过设置maxsize=1),但它仍然需要样板来定义、启动和关闭工作者协程(它从que中消耗),而且你必须确保工作者不会在任务抛出异常时失败。这感觉很荒谬,就像实现你自己的multiprocessing.pool一样。
相反,这里有一个替代方案:

sem = asyncio.Semaphore(n := 5) # specify maximum concurrency

async def task_wrapper(args):
    try:
        await my_task(*args)
    finally:
        sem.release()

for args in my_generator: # may yield too many to list
    await sem.acquire() 
    asyncio.create_task(task_wrapper(args))

# wait for all tasks to complete
for i in range(n):
    await sem.acquire()

当有足够多的活动任务时,这会暂停生成器,并让事件循环清理已完成的任务。注意,对于旧的python版本,请将create_task替换为ensure_future

yyhrrdl8

yyhrrdl86#

使用信号量,您还可以创建一个装饰器来 Package 函数

import asyncio
from functools import wraps
def request_concurrency_limit_decorator(limit=3):
    # Bind the default event loop 
    sem = asyncio.Semaphore(limit)

    def executor(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            async with sem:
                return await func(*args, **kwargs)

        return wrapper

    return executor

然后,将装饰器添加到源下载函数中。

@request_concurrency_limit_decorator(limit=...)
async def download(...):
    ...

现在您可以像以前一样调用下载函数,但是使用信号量来限制并发性。

await download(...)

需要注意的是,在执行decorator函数时,创建的信号量绑定到默认的事件循环,因此不能调用asyncio.run来创建新的循环,而是调用asyncio.get_event_loop().run...来使用默认的事件循环。
asyncio.Semaphore RuntimeError: Task got Future attached to a different loop

lrpiutwd

lrpiutwd7#

**小更新:**不再需要创建循环。我调整了下面的代码。只是稍微清理了一下。

# download(code) is the same

async def main():
    no_concurrent = 3
    dltasks = set()
    for i in range(9):
        if len(dltasks) >= no_concurrent:
            # Wait for some download to finish before adding a new one
            _done, dltasks = await asyncio.wait(dltasks, return_when=asyncio.FIRST_COMPLETED)
        dltasks.add(asyncio.create_task(download(i)))
    # Wait for the remaining downloads to finish
    await asyncio.wait(dltasks)

if __name__ == '__main__':
    asyncio.run(main())
ws51t4hk

ws51t4hk8#

在Windows上使用FastAPI时,我们可能会受到并发连接数的限制,因为默认值是64(由变量FD_SETSIZE定义)。
更多信息请访问https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select?redirectedfrom=MSDN
尽管定义了ProactorEventLoop(它使用IOCP),但在Python 3.7之前的版本中,使用了 select() 例程,导致了异常。
另一种方法是使用Andrei's answer来限制ML/DL上下文中的并发连接数。使用 asyncio + hypercorn + FastAPI,代码如下:

from hypercorn.config import Config
from hypercorn.asyncio import serve
from fastapi import FastAPI
import asyncio
import json
import time
import sys

app = FastAPI()
conn_limit = 10

async def gather_with_concurrency(n, *coros):
    """
    From Andrei's answer
    """
    semaphore = asyncio.Semaphore(n)

    async def sem_coro(coro):
        async with semaphore:
            return await coro
    return await asyncio.gather(*(sem_coro(c) for c in coros))

@app.get('/app/test')
def req_test():
    time.sleep(1)
    return {"test": "ok"}

if __name__ == "__main__":
    # Start the loop
    config = Config()
    config.bind = [f"0.0.0.0:12000"]
    config.workers = 1
    if sys.platform == 'win32':
        logger.info("Setting proactor event loop for Windows platform.")
        loop = asyncio.ProactorEventLoop()
        asyncio.set_event_loop(loop)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(gather_with_concurrency(conn_limit, serve(app, config)))
    loop.close()

**观察结果:**此脚本在Python 3.7.16上针对Apache JMeter上的1000个工作线程进行了测试。

相关问题