python 如何在不阻塞的情况下运行多个定期协程?

5us2dqdw  于 2023-10-14  发布在  Python
关注(0)|答案(1)|浏览(79)

我正在写一个python脚本,每隔几分钟检查几个域的域名。我在while循环中为每个网站运行一个协程,并在检查完成后让它们睡觉。这是可行的,但我也希望能够取消它们。我遇到的问题是,当我在tagencio.gather()中等待这些协程时,它们阻塞了线程,因为它们从不返回结果。
如果去掉await asyncio.gather(*tasks.values(), return_exceptions=True),得到RuntimeError: Session is closed
我怎样才能在不阻塞线程的情况下运行它们?
这是代码的简化版本。我正在使用一个简单的aiohttp服务器进行测试。
服务器代码:

from aiohttp import web
import asyncio
import random

async def handle(request: web.Request) -> web.Response:
    await asyncio.sleep(random.randint(0, 3))
    return web.Response(text=f"Hello, from {request.rel_url.path}")

app = web.Application()
app.router.add_route('GET', '/{name}', handle)

web.run_app(app)

正常运行时间检查器代码:

import asyncio
import aiohttp

LIMIT = 2

async def check_uptime_coro(session: aiohttp.ClientSession, url: str, semaphore: asyncio.BoundedSemaphore) -> None:
    while True:
        try:
            async with semaphore:
                async with session.get(url) as response:
                    if response.status != 200:
                        print(f"error with {url} {response.status}")
                    else:
                        print(f"success with {url}")
            await asyncio.sleep(5)
        except Exception as e:
            print(f"error with {url} {e}")

async def main() -> None:
    urls = [f"http://localhost:8080/{x}" for x in range(0, 10)]
    tasks = {}
    semaphore = asyncio.BoundedSemaphore(LIMIT)
    try:
        async with aiohttp.ClientSession() as session:
            for url in urls:
                tasks[url] = asyncio.create_task(
                    check_uptime_coro(session, url, semaphore))

            await asyncio.gather(*tasks.values(), return_exceptions=True)

        print("This doesn't print!")
    except Exception as e:
        print(f"error! {e}")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print("This also doesn't print!")
70gysomp

70gysomp1#

你必须明白,你的调用do asyncio.gather并不是“阻塞线程”,而是阻塞了 task。如果你想在checker-tasks运行的时候运行东西,并且web会话是打开的,只需触发gather来同步那些任务,而不是你想运行东西的任务。
如果任务永远不会返回,那么实际上你甚至不需要调用gather on-只需注意set或其他容器中的所有任务,并保留这些任务,这样它们就不会被取消引用,并且可以在时间到来时适当地取消。
除此之外,在没有使用gather等待任务时报告错误的唯一原因是,执行福尔斯脱离了打开会话的with语句块。
你可以不使用with代码块,手动调用__enter____exit__方法--但是你也可以简单地重写代码,这样with block就在searate任务中,正如我上面提到的。在Python 3.11中,你可以使用任务组:它们将比gather更好地工作,并且当父任务本身被取消时取消所有检查器任务。

import asyncio
import aiohttp

LIMIT = 2

async def check_uptime_coro(session: aiohttp.ClientSession, url: str, semaphore: asyncio.BoundedSemaphore) -> None:
    while True:
        try:
            async with semaphore:
                async with session.get(url) as response:
                    if response.status != 200:
                        print(f"error with {url} {response.status}")
                    else:
                        print(f"success with {url}")
            await asyncio.sleep(5)
        except Exception as e:
            print(f"error with {url} {e}")

async def check_uptime_master():
        urls = [f"http://localhost:8080/{x}" for x in range(0, 10)]
        tasks = {}
        semaphore = asyncio.BoundedSemaphore(LIMIT)
        async with aiohttp.ClientSession() as session:
            for url in urls:
                tasks[url] = asyncio.create_task(
                    check_uptime_coro(session, url, semaphore))

            await asyncio.gather(*tasks.values(), return_exceptions=True)

async def main() -> None:
    try:
        checker_task = asyncio.create_task(check_uptime_master())
        await asyncio.sleep(0)  # give the asyncio loop  a chance to fire-up the subtasks
        print("This now, does print!")
    except Exception as e:
        print(f"error! {e}")
    # go on with your code on the main task, DOn't forget to yield to the loop
    # so subtasks can run!
    ...

if __name__ == "__main__":
    asyncio.run(main()) # this is the new recomended way to fire asyncio
    # loop = asyncio.get_event_loop()  #<- obsolete
    # loop.run_until_complete(main())  # <- obsolete
    print("This also doesn't print - and will not until yo write code that explictly cancels the `checker_task` above")

相关问题