如何使用非阻塞fastapi websockets进行多处理

gg58donl  于 2023-01-09  发布在  其他
关注(0)|答案(2)|浏览(410)
    • 背景:我正在创建一个websocket接口,它控制ProcessPoolExecutor中的一些CPU绑定**进程。这些进程使用队列定期发送客户端更新,并且仅在收到"停止"消息时终止(长时间运行)。
    • 问题:**在阅读文档后,我无法使ProcessPoolExecutor工作,以便a)套接字保持畅通(并行调用update_cidca()所需)。b)可以使用websocket消息终止进程。

我错过了什么?默认的线程执行器工作,但在这种情况下,MP并行更快(最小的I/O)。有用信息(没有websockets):How to do multiprocessing in FastAPI.线程处理而不是多处理。Make an CPU-bound task asynchronous for FastAPI WebSockets

    • 示例**
@app.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
    pool = ProcessPoolExecutor() #used instead of default/threadpool
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    s = CICADA(clip_class, queue)

    await websocket.accept()

    while True:
        data = await websocket.receive_json()

        #should be non-blocking and terminate on "stop" message
        loop.run_in_executor(pool, update_cicada(data, s, queue))

        #update_cicada adds to queue thus updating client
        result = await queue.get()
        websocket.send_json(result) #so process can update client whithout terminating
jk9hmnmh

jk9hmnmh1#

我两天前读到这个问题,无法摆脱它。这里有多个概念在起作用,有些(如果不是全部)相当复杂。我为自己创建了一个原型来完全理解正在发生的事情,这是一个工作示例。我添加了许多注解来解释正在发生的事情或原因。
几个指针虽然 bat 的权利:

  • asyncio.Queue不是线程或进程安全的。这意味着,当跨进程共享这样的对象时,你可以(很可能)得到一个损坏的状态。这样的队列有利于跨Tasks共享状态,因为它们都在事件循环中的同一个线程上运行。
  • multiprocessing.Queue是线程和进程安全的,但是你需要一个Manager()来处理细节。它本质上创建了另一个子进程来处理与队列的所有通信(来自其他进程)。
  • 确保你的代码没有阻塞其他请求,在下面的例子中,我使用asyncio.sleep()将控制权交还给事件循环,以允许事件循环中的其他任务继续处理,否则,它将阻塞无限while循环中的当前任务。

我用4个并发请求测试了下面的代码(我从命令行使用wscat)请注意,我绝不是asynciomultiprocessing方面的Maven,所以我并不认为这些是最佳实践。

import asyncio
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
from queue import Empty
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import time

app = FastAPI()

#do not re-create the pool with every request, only create it once
pool = ProcessPoolExecutor()

def long_running_task(q: mp.Queue) -> str:
    # This would be your update_cicada function
    for i in range(5):
        #this represents some blocking IO
        time.sleep(3)
        q.put(f"'result': 'Iteration {i}'")
    return "done!"

@app.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
    loop = asyncio.get_event_loop()
    
    #To use Queue's across processes, you need to use the mp.Manager()
    m = mp.Manager()
    q = m.Queue()
    
    await websocket.accept()
    
    #run_in_executor will return a Future object. Normally, you would await
    #such an method but we want a bit more control over it. 
    result = loop.run_in_executor(pool, long_running_task, q)
    while True:
        
        #None of the coroutines called in this block (e.g. send_json()) 
        # will yield back control. asyncio.sleep() does, and so it will allow
        # the event loop to switch context and serve multiple requests 
        # concurrently.
        await asyncio.sleep(0)

        try:
            #see if our long running task has some intermediate result.
            # Will result None if there isn't any.
            q_result = q.get(block=False)
        except Empty:
            #if q.get() throws Empty exception, then nothing was 
            # available (yet!).
            q_result = None

        #If there is an intermediate result, let's send it to the client.
        if q_result:
            try:
                await websocket.send_json(q_result)
            except WebSocketDisconnect:
                #This happens if client has moved on, we should stop the long
                #  running task
                result.cancel()
                #break out of the while loop.
                break
        
        #We want to stop the connection when the long running task is done.
        if result.done():
            try:
                await websocket.send_json(result.result())
                await websocket.close()  
            except WebSocketDisconnect:
                #This happens if client has moved on, we should stop the long
                #  running task
                result.cancel()
            finally:
                #Make sure we break out of the infinte While loop.
                break
            
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000,  )
sgtfey8w

sgtfey8w2#

我还遇到了一些问题,为多个客户端并行实现带有Websocket的FastApi。如文档中所述(在“提示”部分的最后),您应该看看starlette websocket example以了解更复杂的实现。
他们有一个完整的工作示例。

相关问题