在不同线程中管理RabbitMQ自动删除队列,Python

ioekq8ef  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(1)|浏览(171)

我想知道这是否是在不同线程中管理auto_delete队列的正确方法(主要用于测试连接关闭时不希望RabbitMQ队列停留的问题)

import pika
from threading import Thread

class ConsumerThread(Thread):

    def __init__(self, callback, queue):
        Thread.__init__(self)
        self.setDaemon(True)

        self.callback = callback
        self.queue = queue

    def run(self):
        # stablish connection
        connection = pika.BlockingConnection(pika.ConnectionParameters(CONNECTION['address'], CONNECTION['port'], CONNECTION['vhost'], CONNECTION['credentials']))
        channel = connection.channel()

        # create the auto-delete queue
        channel.queue_declare(queue=self.queue, auto_delete=True)

        # start consuming
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(self.callback, queue=self.queue)
        channel.start_consuming()

class Factory:

    def __init__(self):
        self.queue_init = "init.queue"
        self.queue_start = "start.queue"

        threads = [ConsumerThread(self.init_callback, self.queue_init), ConsumerThread(self.start_callback, self.queue_start)]
        for t in threads:
            t.start()

    def init_callback(self, ch, method, properties, body):
        # doing something

    def start_callback(self, ch, method, properties, body):
        # doing something
tf7tbtn2

tf7tbtn21#

Pika不是线程安全的。您必须确保BlockingConnection方法调用发生在连接和通道所在的同一线程上。根据你的代码,我不确定这会发生,因为你在Factory类中调用回调,这看起来很奇怪。为什么不把这些方法放在ConsumerThread中呢?
Pika 0.12和更高版本将包含一个add_callback_threadsafe方法,该方法将调度一个方法在ioloop线程上执行。

相关问题