如何在RabbitMQ中创建延迟队列?

oogrdqng  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(6)|浏览(196)

用Python、Pika和RabbitMQ创建延迟(或停放)队列的最简单方法是什么?我见过类似的questions,但没有一个适合Python。
我发现在设计应用程序时这是一个有用的想法,因为它允许我们限制需要再次重新排队的消息。
总是有这样的可能性,即您将收到比您能够处理的更多的消息,可能是HTTP服务器速度慢,或者数据库承受的压力过大。
我还发现,当在零容忍丢失邮件的情况下出现问题时,它非常有用,而重新排队无法处理的邮件可能会解决这个问题。它还可能导致邮件一遍又一遍地排队的问题。潜在地导致性能问题,并记录垃圾邮件。

jgwigjjp

jgwigjjp1#

我发现这在开发我的应用程序时非常有用。因为它为你提供了一个简单地重新排队消息的替代方法。这可以很容易地降低代码的复杂性,这是RabbitMQ中许多强大的隐藏特性之一。

步骤

首先,我们需要设置两个基本通道,一个用于主队列,一个用于延迟队列。在最后的示例中,我包括了几个附加的标志,它们并不是必需的,但却使代码更可靠;例如confirm deliverydelivery_modedurable。您可以在RabbitMQ manual中找到有关这些内容的更多信息。
在设置通道之后,我们向主通道添加一个绑定,可以使用该绑定将消息从延迟通道发送到我们的主队列。

channel.queue_bind(exchange='amq.direct',
                   queue='hello')

接下来,我们需要配置延迟通道,以便在消息过期后将其转发到主队列。

delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 5000,
  'x-dead-letter-exchange' : 'amq.direct',
  'x-dead-letter-routing-key' : 'hello'
})

这通常用于在特定持续时间后自动删除队列中的旧消息,但通过添加两个可选参数,我们可以更改此行为,并改为让此参数以毫秒为单位确定消息将在延迟队列中停留多长时间。

此变量允许我们在消息过期后将其传输到另一个队列,而不是完全删除它的默认行为。

此变量确定用于将消息从hello_delay传输到hello队列的Exchange。

正在发布到延迟队列

当我们设置完所有基本的Pika参数后,您只需使用basic publish向延迟队列发送一条消息。

delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))

执行完脚本后,您应该会看到RabbitMQ管理模块中创建了以下队列。x1c 0d1x

示例。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))

# Create normal 'Hello World' type channel.

channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer

# messages from our delay queue.

channel.queue_bind(exchange='amq.direct',
                   queue='hello')

# Create our delay channel.

delay_channel = connection.channel()
delay_channel.confirm_delivery()

# This is where we declare the delay, and routing for our delay channel.

delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 5000, # Delay until the message is transferred in milliseconds.
  'x-dead-letter-exchange' : 'amq.direct', # Exchange used to transfer the message from A to B.
  'x-dead-letter-routing-key' : 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))

print " [x] Sent"
webghufk

webghufk2#

您可以使用RabbitMQ官方插件:x-延迟-消息
首先,下载并复制ez fileYour_rabbitmq_root_path/plugins
其次,启用插件(不需要重新启动服务器):

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

最后,发布带有“x-delay”标题的邮件,如下所示:

headers.put("x-delay", 5000);

注意事项:

它并不能保证你的消息的安全,因为如果你的消息在你的rabbitmq服务器停机时过期,不幸的是消息丢失了。所以当你使用这个方案时要小心
享受它和更多信息在rabbitmq-delayed-message-exchange

mlmc2os5

mlmc2os53#

仅供参考,如何在Spring 3.2.x中实现这一点。

<rabbit:queue name="delayQueue" durable="true" queue-arguments="delayQueueArguments"/>

<rabbit:queue-arguments id="delayQueueArguments">
  <entry key="x-message-ttl">
    <value type="java.lang.Long">10000</value>
  </entry>
  <entry key="x-dead-letter-exchange" value="finalDestinationTopic"/>
  <entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/>
</rabbit:queue-arguments>

<rabbit:fanout-exchange name="finalDestinationTopic">
  <rabbit:bindings>
    <rabbit:binding queue="finalDestinationQueue"/>
  </rabbit:bindings>
</rabbit:fanout-exchange>
kt06eoxx

kt06eoxx4#

NodeJS实作。
从代码上看一切都很清楚。希望它能节省一些人的时间。

var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});

// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
      deadLetterExchange: "my_final_delayed_exchange",
      messageTtl: 5000, // 5sec
}, function (err, q) {
      ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
      ch.bindQueue(q.queue, "my_final_delayed_exchange", '');

      ch.consume(q.queue, function (msg) {
          console.log("delayed - [x] %s", msg.content.toString());
      }, {noAck: true});
});
aurhwmvo

aurhwmvo5#

Rabbit队列中消息可以用两种方式延迟-使用队列TTL -使用消息TTL如果队列中所有消息都要延迟固定时间,则使用队列TTL如果每个消息都要延迟不同时间,则使用消息TTL我已经用python3和pika模块解释过了pika BasicProperties参数'expiration'(以毫秒为单位)必须设置为延迟队列中消息在设置了过期时间后,将消息发布到delayed_queue(“不是使用者正在等待使用的实际队列”),一旦delayed_queue中的消息过期,消息将使用exchange“amq.direct”路由到实际队列

def delay_publish(self, messages, queue, headers=None, expiration=0):
    """
    Connect to RabbitMQ and publish messages to the queue
    Args:
        queue (string): queue name
        messages (list or single item): messages to publish to rabbit queue
        expiration(int): TTL in milliseconds for message
    """
    delay_queue = "".join([queue, "_delay"])
    logging.info('Publishing To Queue: {queue}'.format(queue=delay_queue))
    logging.info('Connecting to RabbitMQ: {host}'.format(
        host=self.rabbit_host))
    credentials = pika.PlainCredentials(
       RABBIT_MQ_USER, RABBIT_MQ_PASS)
    parameters = pika.ConnectionParameters(
       rabbit_host, RABBIT_MQ_PORT,
        RABBIT_MQ_VHOST, credentials, heartbeat_interval=0)
    connection = pika.BlockingConnection(parameters)

    channel = connection.channel()
    channel.queue_declare(queue=queue, durable=True)

    channel.queue_bind(exchange='amq.direct',
                       queue=queue)
    delay_channel = connection.channel()
    delay_channel.queue_declare(queue=delay_queue, durable=True,
                                arguments={
                                    'x-dead-letter-exchange': 'amq.direct',
                                    'x-dead-letter-routing-key': queue
                                })

    properties = pika.BasicProperties(
        delivery_mode=2, headers=headers, expiration=str(expiration))

    if type(messages) not in (list, tuple):
        messages = [messages]

    try:
        for message in messages:
            try:
                json_data = json.dumps(message)
            except Exception as err:
                logging.error(
                    'Error Jsonify Payload: {err}, {payload}'.format(
                        err=err, payload=repr(message)), exc_info=True
                )
                if (type(message) is dict) and ('data' in message):
                    message['data'] = {}
                    message['error'] = 'Payload Invalid For JSON'
                    json_data = json.dumps(message)
                else:
                    raise

            try:
                delay_channel.basic_publish(
                    exchange='', routing_key=delay_queue,
                    body=json_data, properties=properties)
            except Exception as err:
                logging.error(
                    'Error Publishing Data: {err}, {payload}'.format(
                        err=err, payload=json_data), exc_info=True
                )
                raise

    except Exception:
        raise

    finally:
        logging.info(
            'Done Publishing. Closing Connection to {queue}'.format(
                queue=delay_queue
            )
        )
        connection.close()
fxnxkyjh

fxnxkyjh6#

根据您的情况和需要,我建议使用以下方法,

相关问题