- 背景:我正在创建一个websocket接口,它控制ProcessPoolExecutor中的一些CPU绑定**进程。这些进程使用队列定期发送客户端更新,并且仅在收到"停止"消息时终止(长时间运行)。
- 问题:**在阅读文档后,我无法使ProcessPoolExecutor工作,以便a)套接字保持畅通(并行调用update_cidca()所需)。b)可以使用websocket消息终止进程。
我错过了什么?默认的线程执行器工作,但在这种情况下,MP并行更快(最小的I/O)。有用信息(没有websockets):How to do multiprocessing in FastAPI.线程处理而不是多处理。Make an CPU-bound task asynchronous for FastAPI WebSockets
- 示例**
@app.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
pool = ProcessPoolExecutor() #used instead of default/threadpool
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
s = CICADA(clip_class, queue)
await websocket.accept()
while True:
data = await websocket.receive_json()
#should be non-blocking and terminate on "stop" message
loop.run_in_executor(pool, update_cicada(data, s, queue))
#update_cicada adds to queue thus updating client
result = await queue.get()
websocket.send_json(result) #so process can update client whithout terminating
2条答案
按热度按时间jk9hmnmh1#
我两天前读到这个问题,无法摆脱它。这里有多个概念在起作用,有些(如果不是全部)相当复杂。我为自己创建了一个原型来完全理解正在发生的事情,这是一个工作示例。我添加了许多注解来解释正在发生的事情或原因。
几个指针虽然 bat 的权利:
asyncio.Queue
不是线程或进程安全的。这意味着,当跨进程共享这样的对象时,你可以(很可能)得到一个损坏的状态。这样的队列有利于跨Tasks
共享状态,因为它们都在事件循环中的同一个线程上运行。multiprocessing.Queue
是线程和进程安全的,但是你需要一个Manager()
来处理细节。它本质上创建了另一个子进程来处理与队列的所有通信(来自其他进程)。asyncio.sleep()
将控制权交还给事件循环,以允许事件循环中的其他任务继续处理,否则,它将阻塞无限while循环中的当前任务。我用4个并发请求测试了下面的代码(我从命令行使用
wscat
)请注意,我绝不是asyncio
或multiprocessing
方面的Maven,所以我并不认为这些是最佳实践。sgtfey8w2#
我还遇到了一些问题,为多个客户端并行实现带有Websocket的FastApi。如文档中所述(在“提示”部分的最后),您应该看看starlette websocket example以了解更复杂的实现。
他们有一个完整的工作示例。