RabbitMQ/Pika使用evenlet阻塞消费者

hrirmatl  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(2)|浏览(184)

我正在开发一个更大的应用程序,它需要eventlet,现在也需要rabbitMQ。看起来eventlet导致pika消费者线程阻止了其他工作线程的执行。我知道Pika不被认为是线程安全的,所以我在它自己的线程中拥有一切,包括连接。我假设阻塞连接应该只阻塞消费者线程。如何让pika和eventlet一起工作?在下面的示例中,worker线程从不打印任何内容,但注解出eventlet.monkey_patch()允许两个线程都执行。

import threading
import pika

import eventlet
eventlet.monkey_patch()

def callback(ch, method, properties, body):
    print body
    ch.basic_ack(delivery_tag=method.delivery_tag)

def consumer():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test', durable=True,
                          exclusive=False, auto_delete=False)
    channel.basic_consume(callback, queue='test')
    channel.start_consuming()

def start_consumer_thread():
    # initialize a listener thread
    consumer_thread = threading.Thread(target=consumer)
    consumer_thread.start()

def worker():
    start_consumer_thread()
    for x in range(1,10000):
        print x

x = threading.Thread(target=worker())
x.start()
laawzig2

laawzig21#

Pika和eventlet.monkey_patch不兼容。如果可能的话,您将不得不在不修补系统调用的情况下使用eventlet

vfhzx4xs

vfhzx4xs2#

我能够让一个pika消费者通过尽早的monkey补丁来使用eventlet,并以补丁的方式导入pika
首先导入并修补stdlib:

import eventlet
eventlet.monkey_patch()

然后导入并修补pika本身:

pika = eventlet.import_patched('pika')

我将此导入策略与asynchronous_consumer_example结合使用:https://pika.readthedocs.io/en/stable/examples/asynchronous_consumer_example.html,并使用eventlet原语而不是threading来实现非阻塞消费者。

相关问题