python-3.x CancelledError异常处理程序未触发

7gyucuyw  于 2023-04-22  发布在  Python
关注(0)|答案(2)|浏览(221)

我的任务被取消的异常处理程序似乎在简单的情况下触发,但在使用多个任务或asyncio.gather时不会触发。
示例代码:

import asyncio

async def sleep_func(statement, time, sabotage=False):
    print(f"Starting: {statement}")
    try:
        if sabotage:
            tasks = [
                asyncio.sleep(1000),
                asyncio.sleep(1000),
                asyncio.sleep(1000),
            ]
            await asyncio.gather(*tasks)
        await asyncio.sleep(time)
    except asyncio.CancelledError as e:
        print(f"cancelled {statement}! - {str(e)}")
    except Exception as e:
        print(f"Unhandled exception - {str(e)}")
    print(f"Ending: {statement}")

async def main():
    calls = [
        asyncio.ensure_future(sleep_func("eat", 3)),
        asyncio.ensure_future(sleep_func("pray", 8)),
        asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
    ]
    print("starting!")
    finished, unfinished = await asyncio.wait(calls, timeout=6)
    for task in unfinished:
        task.cancel("This message should be shown when task is cancelled")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

我的主要职能有三项任务:

[
    asyncio.ensure_future(sleep_func("eat", 3)),
    asyncio.ensure_future(sleep_func("pray", 8)),
    asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
]

预期:

  • 我的第一个任务没有被取消(这个工作)
  • 我的第二个任务被取消,并在

CancelledError处理程序正确(这有效)

  • 我的第二个任务被取消,并在

CancelledError处理程序正确(这不起作用)
为什么最后一个任务没有触发CancelledError处理程序,它使用asyncio.gather并有一堆子任务?
输出:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray

预期输出:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
cancelled love! - This message should be shown when task is cancelled
Ending: love
vulvrdjw

vulvrdjw1#

您没有看到love任务取消的原因是您没有等待它。
当你到达取消for-循环的末尾时,任务还没有被取消,它们只是正在取消,即正在被取消的过程中。
调用Task.cancel方法不会立即取消任务。
安排CancelledError异常在事件循环的下一个周期被抛出到 Package 的协程中。
为了让下一个事件循环真正开始,你需要一个await表达式。这将允许一个上下文切换到一个couroutines发生,并在那里实际引发CancelledError
但是你的main协程结束时没有另一个await,因此没有通过事件循环进行上下文切换的可能性。这就是为什么在Task.cancel文档下的示例代码片段中,要取消的任务在最后被
等待**。
因此,获得所需输出的最简单方法是在for-loop中的task.cancel(...)下面添加await task

...

async def main():
    calls = [
        asyncio.ensure_future(sleep_func("eat", 3)),
        asyncio.ensure_future(sleep_func("pray", 8)),
        asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
    ]
    print("starting!")
    finished, unfinished = await asyncio.wait(calls, timeout=6)
    for task in unfinished:
        task.cancel("This message should be shown when task is cancelled")
        await task  # <---

至于为什么你调用cancel的两个任务中的一个实际上设法将CancelledError发送到它的协程,我不太了解循环的run_until_complete方法的内部结构,但我怀疑它确实允许在返回之前进行另一次上下文切换。但我推测这只是一个实现细节,根本不可靠(如您的示例所示),当取消的任务数量增加时,甚至更少。
顺便说一句,你可能不应该再使用loop = asyncio.get_event_loop()模式了,因为它已经被弃用了。运行异步main函数的规范方法是通过asyncio.run
此外,当您有多个任务要取消时,模式通常是在请求取消后asyncio.gather它们。
最后,正如在注解中提到的,最好的做法是在协程中重新引发捕获的CancelledError。然后,您可以通过简单地将return_exceptions=True传递给asyncio.gather来避免它在main函数中的“进一步链”。
因此,我建议对示例代码进行的更改如下所示:

import asyncio

async def sleep_func(statement, time, sabotage=False):
    print(f"Starting: {statement}")
    try:
        if sabotage:
            tasks = [
                asyncio.sleep(1000),
                asyncio.sleep(1000),
                asyncio.sleep(1000),
            ]
            await asyncio.gather(*tasks)
        await asyncio.sleep(time)
    except asyncio.CancelledError as e:
        print(f"cancelled {statement}! - {str(e)}")
        raise e  # <---
    except Exception as e:
        print(f"Unhandled exception - {str(e)}")
    finally:  # <---
        print(f"Ending: {statement}")

async def main():
    calls = [
        asyncio.ensure_future(sleep_func("eat", 3)),
        asyncio.ensure_future(sleep_func("pray", 8)),
        asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
    ]
    print("starting!")
    finished, unfinished = await asyncio.wait(calls, timeout=6)
    for task in unfinished:
        task.cancel("This message should be shown when task is cancelled")
    await asyncio.gather(*unfinished, return_exceptions=True)  # <---

if __name__ == "__main__":
    asyncio.run(main())  # <---

输出:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
cancelled love! - This message should be shown when task is cancelled
Ending: love

PS:从Python 3.11开始,我们有asyncio.TaskGroup类,它为这种情况提供了一个方便的上下文管理器。利用它,我们可以像这样编写main函数:

...

async def main():
    print("starting!")
    async with asyncio.TaskGroup() as tg:
        tasks = [
            tg.create_task(sleep_func("eat", 3)),
            tg.create_task(sleep_func("pray", 8)),
            tg.create_task(sleep_func("love", 10, sabotage=True)),
        ]
        finished, unfinished = await asyncio.wait(tasks, timeout=6)
        for task in unfinished:
            task.cancel("This message should be shown when task is cancelled")

注意不需要在这里等待任务,因为
当上下文管理器退出时,等待所有任务。
我们也不需要担心传播的CancelledError,因为它不会“泄漏”出任务组上下文。

vqlkdk9b

vqlkdk9b2#

问题

这里的问题似乎是,没有时间让收集的任务实际完成取消并将CancellationError抛出到 Package try_except_sleeper
cancel仅请求取消并立即返回文档

溶液

await asyncio.wait(unfinished, timeout=1)添加到main函数的最后一行。这样协程将有足够的时间来处理取消请求,输出将是:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
cancelled love! -

顺便说一句:你看到取消消息没有被传递到 Package 器-我不知道为什么,可能是一个bug。

相关问题