rabbitmq FastApi Aio-Pika在重置后重新连接到rabbit Broker的问题

kd3sttzy  于 2023-04-30  发布在  RabbitMQ
关注(0)|答案(1)|浏览(339)

我正在使用FastApi(0.78)与AioPika(版本9.0.5),用于使用Rabbit消费和发布消息的服务。我的应用程序运行在k8s上,使用aws rabbit broker。Rabbit代理在集群中工作,有3个示例用于负载均衡消息。一周或两周aws会对broker进行一次维护,就像取下一个示例,更新它,然后再次运行到集群。在使用旧版本关闭代理的最后一个示例后,应用程序试图重新连接到代理,但它只连接到代理的新示例,而不是连接到使用者的队列。
我的代码实现从rabbit消费看起来像这样:
正在启动使用者的循环任务:

main.py

@app.on_event('startup')
async def startup() -> None:
    """Execute messages consumption on the application startup."""
    loop = asyncio.get_event_loop()
    task = loop.create_task(consume(loop))

    await task

正在连接到rabbit并消耗的方法

tasks.py

async def consume(loop: BaseEventLoop) -> Connection:
    """Consume messages from RabbitMQ.

    Args:
        loop: Event loop.

    Returns:
        Aio-pika connection.
    """
    logger.info("Starting messages consumption...")

    conn: Connection = await connect_robust(settings.rabbitmq_url, loop=loop)
    channel = await conn.channel()
    exchange = await channel.get_exchange(settings.rabbitmq_exchange)
    queue_bridge = await channel.declare_queue('queue_name')

    await queue_bridge.bind(exchange, 'queue_name')
    await queue_bridge.consume(on_message)

    return conn

async def on_message(message: IncomingMessage) -> None:
    """Process incoming message with report.

    Args:
        message: Message received from the RabbitMQ.
    """
    async with message.process(ignore_processed=True):
        message_body = json.loads(message.body.decode('utf-8'))

        logger.info("Received message on %s", message.routing_key, extra={
            'body': message_body,
        })

        if message.routing_key == 'routing.key.name':
            Do something with message payload

根据aio-pike文档,我应该使用connect_robust,它在失去连接后重新连接所有内容(连接,交换,队列),应用程序正在这样做:

Unexpected connection close from remote "amqps:SOMETHING", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None
Unexpected connection close from remote "amqps:SOMETHING", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.

应用程序成功连接到新的代理示例,但没有连接到交换和队列(没有使用者)。
我在aio-pika中读到了很多问题,其中说重新连接有问题,但现在已经解决了。我不知道我做错了什么。
希望你们能帮忙。

56lgkhnf

56lgkhnf1#

一个月前我也遇到过同样的问题,connect_robust并不是很健壮。我深吸一口气,切换到aiormq。在半天内,它被解决使用他们的显式连接。closing.add_done_callback().你必须自己在高层次上处理重新连接,但对我来说,重新调用我的整个init/connect序列只需要几行代码。
请注意,aio-pika已经在后台使用了aiormq(它不再使用pika)。

相关问题