如何使用FastAPI作为RabbitMQ(RPC)的使用者

iecba09b  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(368)

示例here显示了如何使用远程过程调用(RPC)在python中创建客户端和服务器。
但我无法想象FastAPI服务如何成为使用pika for RabbitMQ来消费来自RCP客户机的请求的服务器。
任何Web服务都将通过显式调用它们来请求,但是,我无法想象如何在Web服务内部集成RabbitMQ消费者。
另一方面,对于客户端来说,通过显式调用Web服务,您可以发布对队列see this example的请求,这样做很容易
有什么帮助吗?或者是一个好的开始?

k5hmc34c

k5hmc34c1#

您可以将aio_pika与RPC模式一起使用,并执行以下操作:

服务1(使用)

循环使用:


# app/__init__.py

from fastapi import FastAPI
from app.rpc import consume

app = FastAPI()

...

@app.on_event('startup')
def startup():
    loop = asyncio.get_event_loop()
    # use the same loop to consume
    asyncio.ensure_future(consume(loop))

...

创建要从另一个服务调用的连接、通道和注册远程方法:


# app/rpc.py

from aio_pika import connect_robust
from aio_pika.patterns import RPC

from app.config import config

__all__ = [
    'consume'
]

def remote_method():
    # DO SOMETHING
    # Move this method along with others to another place e.g. app/rpc_methods
    # I put it here for simplicity
    return 'It works!'

async def consume(loop):
    connection = await connect_robust(config.AMQP_URI, loop=loop)
    channel = await connection.channel()
    rpc = await RPC.create(channel)

    # Register your remote method
    await rpc.register('remote_method', remote_method, auto_delete=True)
    return connection

这就是您需要使用和响应的全部内容,现在让我们看看调用此远程方法的第二个服务。

服务2(调用远程方法)

让我们首先创建RPC中间件,以便轻松地管理和访问RPC对象,从而从API函数调用远程方法:


# app/utils/rpc_middleware.py

import asyncio

from fastapi import Request, Response

from aio_pika import connect_robust
from aio_pika.patterns import RPC

from app.config import config

__all__ = [
    'get_rpc',
    'rpc_middleware'
]

async def rpc_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        # You can also pass a loop as an argument. Keep it here now for simplicity
        loop = asyncio.get_event_loop()
        connection = await connect_robust(config.AMQP_URI, loop=loop)
        channel = await connection.channel()
        request.state.rpc = await RPC.create(channel)
        response = await call_next(request)
    finally:

        # UPD: just thought that we probably want to keep queue and don't
        # recreate it for each request so we can remove this line and move
        # connection, channel and rpc initialisation out from middleware 
        # and do it once on app start

        # Also based of this: https://github.com/encode/starlette/issues/1029
        # it's better to create ASGI middleware instead of HTTP
        await request.state.rpc.close()
    return response

# Dependency to use rpc inside routes functions

def get_rpc(request: Request):
    rpc = request.state.rpc
    return rpc

应用RPC中间件:


# app/__init__.py

from app.utils import rpc_middleware

...

app.middleware('http')(rpc_middleware)

...

通过API函数中的依赖关系使用RPC对象:


# app/api/whatever.py

from aio_pika.patterns import RPC

from app.utils import get_rpc

...

@router.get('/rpc')
async def rpc_test(rpc: RPC = Depends(get_rpc)):
    response = await rpc.proxy.remote_method()
    ...

添加一些日志记录来跟踪两个服务中发生的情况。您还可以将两个服务中的RPC逻辑合并为一个,以便能够在同一个服务中使用和调用远程方法。
希望能对你的基本想法有所帮助。

相关问题