我正在使用FastApi(0.78)与AioPika(版本9.0.5),用于使用Rabbit消费和发布消息的服务。我的应用程序运行在k8s上,使用aws rabbit broker。Rabbit代理在集群中工作,有3个示例用于负载均衡消息。一周或两周aws会对broker进行一次维护,就像取下一个示例,更新它,然后再次运行到集群。在使用旧版本关闭代理的最后一个示例后,应用程序试图重新连接到代理,但它只连接到代理的新示例,而不是连接到使用者的队列。
我的代码实现从rabbit消费看起来像这样:
正在启动使用者的循环任务:
main.py
@app.on_event('startup')
async def startup() -> None:
"""Execute messages consumption on the application startup."""
loop = asyncio.get_event_loop()
task = loop.create_task(consume(loop))
await task
正在连接到rabbit并消耗的方法
tasks.py
async def consume(loop: BaseEventLoop) -> Connection:
"""Consume messages from RabbitMQ.
Args:
loop: Event loop.
Returns:
Aio-pika connection.
"""
logger.info("Starting messages consumption...")
conn: Connection = await connect_robust(settings.rabbitmq_url, loop=loop)
channel = await conn.channel()
exchange = await channel.get_exchange(settings.rabbitmq_exchange)
queue_bridge = await channel.declare_queue('queue_name')
await queue_bridge.bind(exchange, 'queue_name')
await queue_bridge.consume(on_message)
return conn
async def on_message(message: IncomingMessage) -> None:
"""Process incoming message with report.
Args:
message: Message received from the RabbitMQ.
"""
async with message.process(ignore_processed=True):
message_body = json.loads(message.body.decode('utf-8'))
logger.info("Received message on %s", message.routing_key, extra={
'body': message_body,
})
if message.routing_key == 'routing.key.name':
Do something with message payload
根据aio-pike文档,我应该使用connect_robust
,它在失去连接后重新连接所有内容(连接,交换,队列),应用程序正在这样做:
Unexpected connection close from remote "amqps:SOMETHING", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None
Unexpected connection close from remote "amqps:SOMETHING", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
应用程序成功连接到新的代理示例,但没有连接到交换和队列(没有使用者)。
我在aio-pika中读到了很多问题,其中说重新连接有问题,但现在已经解决了。我不知道我做错了什么。
希望你们能帮忙。
1条答案
按热度按时间56lgkhnf1#
一个月前我也遇到过同样的问题,connect_robust并不是很健壮。我深吸一口气,切换到aiormq。在半天内,它被解决使用他们的显式连接。closing.add_done_callback().你必须自己在高层次上处理重新连接,但对我来说,重新调用我的整个init/connect序列只需要几行代码。
请注意,aio-pika已经在后台使用了aiormq(它不再使用pika)。