Python - RabbitMQ Pika消费者-如何使用异步函数作为回调函数

9rygscc1  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(179)

我有下面的代码,我在其中初始化了一个侦听队列的使用者。

consumer = MyConsumer()
consumer.declare_queue(queue_name="my-jobs")
consumer.declare_exchange(exchange_name="my-jobs")
consumer.bind_queue(
    exchange_name="my-jobs", queue_name="my-jobs", routing_key="jobs"
)
consumer.consume_messages(queue="my-jobs", callback=consumer.consume)

问题在于consume方法的定义如下:

async def consume(self, channel, method, properties, body):

在consume方法中,我们需要等待异步函数,但是这会产生一个错误“coroutine is not waited”给consume函数。有没有办法在pika中使用异步函数作为回调函数呢?

vom3gejh

vom3gejh1#

我用@sync注解了我的回调,其中sync是:

def sync(f):
@functools.wraps(f)
def wrapper(*args,**kwargs):
    return asyncio.get_event_loop().run_until_complete(f(*args,**kwargs))
return wrapper

(发现它here用于celery ,但它对鼠兔也有效)

相关问题