假设我们有一堆链接要下载,每个链接下载所需的时间不同,我只能使用最多3个连接进行下载,现在,我想确保使用asyncio可以高效地完成这一任务。
这是我想要达到的目标:在任何时间点,尽量确保我有至少3个下载运行。
Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----
数字代表下载链接,而连字符代表等待下载。
这是我现在使用的代码
from random import randint
import asyncio
count = 0
async def download(code, permit_download, no_concurrent, downloading_event):
global count
downloading_event.set()
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
count -= 1
if count < no_concurrent and not permit_download.is_set():
permit_download.set()
async def main(loop):
global count
permit_download = asyncio.Event()
permit_download.set()
downloading_event = asyncio.Event()
no_concurrent = 3
i = 0
while i < 9:
if permit_download.is_set():
count += 1
if count >= no_concurrent:
permit_download.clear()
loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
await downloading_event.wait() # To force context to switch to download function
downloading_event.clear()
i += 1
else:
await permit_download.wait()
await asyncio.sleep(9)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
并且输出如预期:
downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8
但我的问题是:
1.目前,我只是等待9秒钟来保持main函数运行,直到下载完成。有没有一种有效的方法可以在退出main
函数之前等待最后一次下载完成?(我知道有asyncio.wait
,但我需要存储所有的任务引用才能使其工作)
1.有什么好的库可以完成这种任务呢?我知道javascript有很多异步库,但是Python呢?
编辑:2.什么样的库可以处理常见的异步模式?(类似于async)
8条答案
按热度按时间a2mppw5e1#
如果我没记错的话,您正在搜索asyncio.Semaphore。用法示例:
输出:
aiohttp
异步下载的一个例子是here。注意aiohttp
有一个内置的信号量等价物,你可以看到一个here的例子。它有一个默认的100个连接的限制。pcrecxhr2#
我用了Mikhail Gerasimov的答案,最后得到了这个小宝石
您将运行它而不是正常的收集
8oomwypt3#
在阅读本答案的其余部分之前,请注意,限制异步并行任务数量的惯用方法是使用
asyncio.Semaphore
,如Mikhail's answer所示,并在Andrei's answer中进行了优雅的抽象。本答案包含实现相同目的的有效方法,但有点复杂。我不回答这个问题,因为在某些情况下,这种方法比信号量更有优势。特别是当要做的工作非常大或者是无限的,并且你不能提前创建所有的协程时,在这种情况下,第二个(基于队列的)解决方案是这个答案是你想要的,但是在大多数常规的情况下,比如通过aiohttp并行下载,你应该使用信号量来代替。基本上,您需要一个固定大小的下载任务池。
asyncio
没有预先创建任务池,但创建一个任务池很容易:简单地保留一组任务,不要让它超出限制。虽然问题表明您不愿意走这条路,但代码最终要优雅得多:另一种方法是创建一个固定数量的协同程序来执行下载,就像一个固定大小的线程池,然后使用
asyncio.Queue
来为它们提供工作,这样就不需要手动限制下载的数量,下载的数量会自动受到调用download()
的协同程序的数量的限制:至于您的另一个问题,显而易见的选择是
aiohttp
。0tdrvxhp4#
异步池库完全可以满足您的需要。
https://pypi.org/project/asyncio-pool/
h7appiyu5#
如果你有一个生成任务的生成器,那么可能会有更多的任务,而你的内存无法同时容纳。
经典的
asyncio.Semaphore
上下文管理器模式将所有任务同时争用内存。我不喜欢
asyncio.Queue
模式。你可以阻止它将所有任务预加载到内存中(通过设置maxsize=1
),但它仍然需要样板来定义、启动和关闭工作者协程(它从que中消耗),而且你必须确保工作者不会在任务抛出异常时失败。这感觉很荒谬,就像实现你自己的multiprocessing.pool
一样。相反,这里有一个替代方案:
当有足够多的活动任务时,这会暂停生成器,并让事件循环清理已完成的任务。注意,对于旧的python版本,请将
create_task
替换为ensure_future
。yyhrrdl86#
使用信号量,您还可以创建一个装饰器来 Package 函数
然后,将装饰器添加到源下载函数中。
现在您可以像以前一样调用下载函数,但是使用信号量来限制并发性。
需要注意的是,在执行decorator函数时,创建的信号量绑定到默认的事件循环,因此不能调用
asyncio.run
来创建新的循环,而是调用asyncio.get_event_loop().run...
来使用默认的事件循环。asyncio.Semaphore RuntimeError: Task got Future attached to a different loop
lrpiutwd7#
**小更新:**不再需要创建循环。我调整了下面的代码。只是稍微清理了一下。
ws51t4hk8#
在Windows上使用FastAPI时,我们可能会受到并发连接数的限制,因为默认值是64(由变量FD_SETSIZE定义)。
更多信息请访问https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select?redirectedfrom=MSDN
尽管定义了ProactorEventLoop(它使用IOCP),但在Python 3.7之前的版本中,使用了 select() 例程,导致了异常。
另一种方法是使用Andrei's answer来限制ML/DL上下文中的并发连接数。使用 asyncio + hypercorn + FastAPI,代码如下:
**观察结果:**此脚本在
Python 3.7.16
上针对Apache JMeter上的1000个工作线程进行了测试。