我正在写一个python脚本,每隔几分钟检查几个域的域名。我在while循环中为每个网站运行一个协程,并在检查完成后让它们睡觉。这是可行的,但我也希望能够取消它们。我遇到的问题是,当我在tagencio.gather()中等待这些协程时,它们阻塞了线程,因为它们从不返回结果。
如果去掉await asyncio.gather(*tasks.values(), return_exceptions=True)
,得到RuntimeError: Session is closed
我怎样才能在不阻塞线程的情况下运行它们?
这是代码的简化版本。我正在使用一个简单的aiohttp服务器进行测试。
服务器代码:
from aiohttp import web
import asyncio
import random
async def handle(request: web.Request) -> web.Response:
await asyncio.sleep(random.randint(0, 3))
return web.Response(text=f"Hello, from {request.rel_url.path}")
app = web.Application()
app.router.add_route('GET', '/{name}', handle)
web.run_app(app)
正常运行时间检查器代码:
import asyncio
import aiohttp
LIMIT = 2
async def check_uptime_coro(session: aiohttp.ClientSession, url: str, semaphore: asyncio.BoundedSemaphore) -> None:
while True:
try:
async with semaphore:
async with session.get(url) as response:
if response.status != 200:
print(f"error with {url} {response.status}")
else:
print(f"success with {url}")
await asyncio.sleep(5)
except Exception as e:
print(f"error with {url} {e}")
async def main() -> None:
urls = [f"http://localhost:8080/{x}" for x in range(0, 10)]
tasks = {}
semaphore = asyncio.BoundedSemaphore(LIMIT)
try:
async with aiohttp.ClientSession() as session:
for url in urls:
tasks[url] = asyncio.create_task(
check_uptime_coro(session, url, semaphore))
await asyncio.gather(*tasks.values(), return_exceptions=True)
print("This doesn't print!")
except Exception as e:
print(f"error! {e}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("This also doesn't print!")
1条答案
按热度按时间70gysomp1#
你必须明白,你的调用do
asyncio.gather
并不是“阻塞线程”,而是阻塞了 task。如果你想在checker-tasks运行的时候运行东西,并且web会话是打开的,只需触发gather
来同步那些任务,而不是你想运行东西的任务。如果任务永远不会返回,那么实际上你甚至不需要调用gather on-只需注意set或其他容器中的所有任务,并保留这些任务,这样它们就不会被取消引用,并且可以在时间到来时适当地取消。
除此之外,在没有使用gather等待任务时报告错误的唯一原因是,执行福尔斯脱离了打开会话的
with
语句块。你可以不使用
with
代码块,手动调用__enter__
和__exit__
方法--但是你也可以简单地重写代码,这样with block就在searate任务中,正如我上面提到的。在Python 3.11中,你可以使用任务组:它们将比gather
更好地工作,并且当父任务本身被取消时取消所有检查器任务。