取消fastAPI WebSocket内剩余的三个Nursery任务的正确方法?

q9yhzks0  于 2022-11-11  发布在  其他
关注(0)|答案(1)|浏览(154)

我对websockets还是个新手,我遇到了一个很难解决的问题。
我需要用FastAPI构建一个WebSocket端点,其中一组任务异步运行(为此,我使用了trio),每个任务通过websocket实时返回一个json值。
我已经设法满足了这些要求,我的代码如下所示:

@router.websocket('/stream')
async def runTasks(
        websocket: WebSocket
):
    # Initialise websocket
    await websocket.accept()
    while True:
        # Receive data
        tasks = await websocket.receive_json()
        # Run tasks asynchronously (limiting to 10 tasks at a time)
        async with trio.open_nursery() as nursery:
            limit = trio.CapacityLimiter(10)
            for task in tasks:
                nursery.start_soon(run_task, limit, task, websocket)

run_task看起来像这样:

async def run_task(limit, task, websocket):
    async with limit:
       # Complete task / transaction
       await websocket.send_json({"placeholder":"data"})

但是现在,给出了两个场景,我应该取消/跳过当前剩余的托儿所任务,但我有点不知所措,不知道如何才能做到这一点。
我给出的两个场景如下:

***场景1:**假设用户按下按钮时调用端点,如果用户在某些任务仍在运行时再次按下按钮,则应取消或跳过这些任务,并重新开始该过程
***方案2:**如果要关闭WebSocket,用户要刷新页面,或在Nursery任务完成之前退出,则应取消或跳过剩余任务

我试图了解更多关于Python - How to cancel a specific task spawned by a nursery in python-trio的内容,但我仍然不明白如何在进入新的Nursery之前取消以前的带有cancel scope的Nursery。我是否应该创建一个额外的任务来监视变量或其他东西,并在它发生变化时取消它?但是,一旦所有其他任务都完成了,我就不得不停止该任务

ubof19bj

ubof19bj1#

对于场景1:

1.在全局名称空间中创建字典,用于存储取消范围和事件(关键字:UUID,瓦尔:Tuple[trio.CancelScope, trio.Event]
1.为每个客户端分配唯一的UUID(客户端唯一的任何信息)
1.让客户端在连接开始时发送UUID
1.检查字典是否将该UUID作为关键字。如果存在,请取消范围并等待设置事件。
1.现在进行实际传输

对于场景2:

如果客户端没有显式关闭websocket,WebSocket不知道客户端是否断开连接。因此,我能想到的最好办法是强制超时,并在每次传输时等待客户端的响应。(这使得这种方法有点低效)。
可能最好进行一次具有容错能力的定期检查,比如每5分钟检查一次,最多允许2次连续超时--但为了简单起见,每次传输都强制超时。
下面是以上想法的演示代码。

客户代码:

由于我不知道客户端代码是什么样子的,我只是做了一些客户端来测试你的关注点。
这是一个有点bug,但我没有学习js -请不要判断客户端代码太严重!

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Websocket test</title>
</head>
<body>
    <button id="start">Start connection</button>
    <button id="close" disabled>Close connection</button>
    <input type="text" id="input_" value="INPUT_YOUR_UUID">

    <div id="state">Status: Waiting for connection</div>

    <script>
        let state = document.getElementById("state")
        let start_btn = document.getElementById("start")
        let close_btn = document.getElementById("close")
        let input_ = document.getElementById("input_")

        function sleep(sec) {
            state.textContent = `Status: sleeping ${sec} seconds`
            return new Promise((func) => setTimeout(func, sec * 1000))
        }

        function websocket_test() {
            return new Promise((resolve, reject) => {
                let socket = new WebSocket("ws://127.0.0.1:8000/stream")

                socket.onopen = function () {
                    state.textContent = "Status: Sending UUID - " + input_.value
                    socket.send(input_.value)
                    close_btn.disabled = false
                    close_btn.onclick = function () {socket.close()}
                }
                socket.onmessage = function (msg) {
                    state.textContent = "Status: Message Received - " + msg.data
                    socket.send("Received")
                }
                socket.onerror = function (error) {
                    reject(error)
                    state.textContent = "Status: Error encountered"
                }
                socket.onclose = function () {
                    state.textContent = "Status: Connection Stopped"
                    close_btn.disabled = true
                }
            })
        }

        start_btn.onclick = websocket_test

    </script>
</body>
</html>

服务器代码:

在以前的测试中,我看到服务器抛出超时,但无法再现它-如果对行为有信心,您可能不需要trio.fail_afterexcept trio.TooSlowError部分。

"""
Nursery cancellation demo
"""
import itertools
from typing import Dict, Tuple

import trio
import fastapi
import hypercorn
from hypercorn.trio import serve

GLOBAL_NURSERY_STORAGE: Dict[str, Tuple[trio.CancelScope, trio.Event]] = {}
TIMEOUT = 5

router = fastapi.APIRouter()

@router.websocket('/stream')
async def run_task(websocket: fastapi.WebSocket):
    # accept and receive UUID
    # Replace UUID with anything client-specific
    await websocket.accept()
    uuid_ = await websocket.receive_text()

    print(f"[{uuid_}] CONNECTED")

    # check if nursery exist in session, if exists, cancel it and wait for it to end.
    if uuid_ in GLOBAL_NURSERY_STORAGE:
        print(f"[{uuid_}] STOPPING NURSERY")
        cancel_scope, event = GLOBAL_NURSERY_STORAGE[uuid_]
        cancel_scope.cancel()
        await event.wait()

    # create new event, and start new nursery.
    cancel_done_event = trio.Event()

    async with trio.open_nursery() as nursery:
        # save ref
        GLOBAL_NURSERY_STORAGE[uuid_] = nursery.cancel_scope, cancel_done_event

        try:
            for n in itertools.count(0, 1):
                nursery.start_soon(task, n, uuid_, websocket)
                await trio.sleep(1)

                # wait for client response
                with trio.fail_after(TIMEOUT):
                    recv = await websocket.receive_text()
                    print(f"[{uuid_}] RECEIVED {recv}")

        except trio.TooSlowError:
            # client possibly left without proper disconnection, due to network issue
            print(f"[{uuid_}] CLIENT TIMEOUT")

        except fastapi.websockets.WebSocketDisconnect:
            # client performed proper disconnection
            print(f"[{uuid_}] CLIENT DISCONNECTED")

    # fire event, and pop reference if any.
    cancel_done_event.set()
    GLOBAL_NURSERY_STORAGE.pop(uuid_, None)
    print(f"[{uuid_}] NURSERY STOPPED & REFERENCE DROPPED")

async def task(text, uuid_, websocket: fastapi.WebSocket):
    await websocket.send_text(str(text))
    print(f"[{uuid_}] SENT {text}")

if __name__ == '__main__':
    cornfig = hypercorn.Config()
    # cornfig.bind = "ws://127.0.0.1:8000"
    trio.run(serve, router, cornfig)

运行输出示例:

客户端

服务器

[2022-01-31 21:23:12 +0900] [17204] [INFO] Running on http://127.0.0.1:8000 (CTRL + C to quit)
[2] CONNECTED      < start connection on tab 2
[2] SENT 0
[2] RECEIVED Received
[2] SENT 1
[2] RECEIVED Received
[2] SENT 2
[2] RECEIVED Received
[2] SENT 3
[2] RECEIVED Received
[2] SENT 4
[1] CONNECTED      < start connection on tab 1
[1] SENT 0
[2] RECEIVED Received
[2] SENT 5
[1] RECEIVED Received
[1] SENT 1
...
[2] SENT 18
[1] RECEIVED Received
[1] SENT 14
[2] RECEIVED Received
[2] SENT 19
[1] CLIENT DISCONNECTED      < closed connection on tab 1
[1] NURSERY STOPPED & REFERENCE DROPPED      < tab 1 nursery terminated
[2] RECEIVED Received
[2] SENT 20
[2] RECEIVED Received
[2] SENT 21
[1] CONNECTED      < start connection on tab 1
[1] SENT 0
[2] RECEIVED Received
[2] SENT 22
[1] RECEIVED Received
...
[2] SENT 26
[1] RECEIVED Received
[1] SENT 5
[2] CLIENT DISCONNECTED      < tab 2 closed
[2] NURSERY STOPPED & REFERENCE DROPPED      < tab 2 nursery terminated
[1] RECEIVED Received
[1] SENT 6
[1] RECEIVED Received
[1] SENT 7
[1] RECEIVED Received
[1] SENT 8
[1] CONNECTED      < start another connection on tab 1 without closing
[1] STOPPING NURSERY      < previous connection on tab 1 terminating
[1] NURSERY STOPPED & REFERENCE DROPPED      < previous connection on tab 1 terminated
[1] SENT 0
[1] RECEIVED Received
[1] SENT 1
...
[1] RECEIVED Received
[1] SENT 8
[1] CLIENT DISCONNECTED      < Refreshed tab 1
[1] NURSERY STOPPED & REFERENCE DROPPED      < tab 1 nursery terminated
...

相关问题