我对websockets还是个新手,我遇到了一个很难解决的问题。
我需要用FastAPI构建一个WebSocket端点,其中一组任务异步运行(为此,我使用了trio),每个任务通过websocket实时返回一个json值。
我已经设法满足了这些要求,我的代码如下所示:
@router.websocket('/stream')
async def runTasks(
websocket: WebSocket
):
# Initialise websocket
await websocket.accept()
while True:
# Receive data
tasks = await websocket.receive_json()
# Run tasks asynchronously (limiting to 10 tasks at a time)
async with trio.open_nursery() as nursery:
limit = trio.CapacityLimiter(10)
for task in tasks:
nursery.start_soon(run_task, limit, task, websocket)
run_task
看起来像这样:
async def run_task(limit, task, websocket):
async with limit:
# Complete task / transaction
await websocket.send_json({"placeholder":"data"})
但是现在,给出了两个场景,我应该取消/跳过当前剩余的托儿所任务,但我有点不知所措,不知道如何才能做到这一点。
我给出的两个场景如下:
***场景1:**假设用户按下按钮时调用端点,如果用户在某些任务仍在运行时再次按下按钮,则应取消或跳过这些任务,并重新开始该过程
***方案2:**如果要关闭WebSocket,用户要刷新页面,或在Nursery任务完成之前退出,则应取消或跳过剩余任务
我试图了解更多关于Python - How to cancel a specific task spawned by a nursery in python-trio的内容,但我仍然不明白如何在进入新的Nursery之前取消以前的带有cancel scope的Nursery。我是否应该创建一个额外的任务来监视变量或其他东西,并在它发生变化时取消它?但是,一旦所有其他任务都完成了,我就不得不停止该任务
1条答案
按热度按时间ubof19bj1#
对于场景1:
1.在全局名称空间中创建字典,用于存储取消范围和事件(关键字:
UUID
,瓦尔:Tuple[trio.CancelScope, trio.Event]
个1.为每个客户端分配唯一的UUID(客户端唯一的任何信息)
1.让客户端在连接开始时发送UUID
1.检查字典是否将该UUID作为关键字。如果存在,请取消范围并等待设置事件。
1.现在进行实际传输
对于场景2:
如果客户端没有显式关闭websocket,WebSocket不知道客户端是否断开连接。因此,我能想到的最好办法是强制超时,并在每次传输时等待客户端的响应。(这使得这种方法有点低效)。
可能最好进行一次具有容错能力的定期检查,比如每5分钟检查一次,最多允许2次连续超时--但为了简单起见,每次传输都强制超时。
下面是以上想法的演示代码。
客户代码:
由于我不知道客户端代码是什么样子的,我只是做了一些客户端来测试你的关注点。
这是一个有点bug,但我没有学习js -请不要判断客户端代码太严重!
服务器代码:
在以前的测试中,我看到服务器抛出超时,但无法再现它-如果对行为有信心,您可能不需要
trio.fail_after
和except trio.TooSlowError
部分。运行输出示例:
客户端
服务器