var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});
// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
deadLetterExchange: "my_final_delayed_exchange",
messageTtl: 5000, // 5sec
}, function (err, q) {
ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});
ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
ch.bindQueue(q.queue, "my_final_delayed_exchange", '');
ch.consume(q.queue, function (msg) {
console.log("delayed - [x] %s", msg.content.toString());
}, {noAck: true});
});
Declare the delayed queue
Add the x-dead-letter-exchange argument property, and set it to the default exchange "".
Add the x-dead-letter-routing-key argument property, and set it to the name of the destination queue.
Add the x-message-ttl argument property, and set it to the number of milliseconds you want to delay the message.
Subscribe to the destination queue
字符串 RabbitMQ repository on GitHub中还有一个延迟消息传递插件。 请注意,有一个名为Celery的解决方案,它通过提供一个名为apply_async()的调用API来支持RabbitMQ代理上的延迟任务队列。Celery支持Python、Node和PHP。
8条答案
按热度按时间rta7y2nd1#
有两种方法可以尝试:
**老方法:**在每个消息/队列(策略)中设置TTL(生存时间)头,然后引入DLQ来处理它。一旦ttl过期,您的消息将从DLQ移动到主队列,以便侦听器可以处理它。
最新方法:最近RabbitMQ推出了RabbitMQ延迟消息插件,使用该插件可以实现相同的功能,该插件支持从RabbitMQ-3.5.8开始。
您可以使用x-delayed-message类型声明一个交换,然后使用自定义头x-delay发布消息,以毫秒表示消息的延迟时间。消息将在x-delay毫秒后传递到相应的队列
字符串
更多信息:git
wgxvkvu92#
随着RabbitMQ v2.8的发布,计划交付现在可用,但作为间接功能:http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html
rjjhvcjd3#
感谢Norman's answer,我可以在Node.js中实现它。
代码中的一切都很清楚。
字符串
wbgh16ku4#
由于我没有足够的声誉添加评论,张贴一个新的答案。这只是对http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html中已经讨论过的内容的补充
除了不在消息上设置ttl之外,您可以在队列级别设置它。你也可以避免仅仅为了将消息重定向到不同的队列而创建一个新的交换。下面是示例Java代码:
制作人:
字符串
消费者:
型
9udxz4iz5#
看起来像this blog post描述了使用死信交换和消息ttl来做类似的事情。
下面的代码使用CoffeeScript和Node.js来访问Rabbit并实现类似的东西。
字符串
woobm2wo6#
目前还不可能。你必须将你的过期时间戳存储在一个数据库或类似的东西中,然后有一个助手程序来读取这些时间戳并将消息排队。
延迟消息是一个经常需要的功能,因为它们在许多情况下都很有用。但是,如果您需要终止客户端会话,我认为消息传递并不是您的理想解决方案,另一种方法可能会更好。
py49o6xq7#
假设你可以控制消费者,你可以像这样实现对消费者的延迟??:
如果我们确定队列中的第n条消息总是比第n+1条消息具有更小的延迟(这对于许多用例都是正确的):生产者在任务中发送timeInformation,传递需要执行此作业的时间(currentTime + delay)。消费者:
1)从任务中读取scheduledTime
2)如果currentTime > scheduledTime,则继续。
否则delay = scheduledTime - currentTime
延迟指示睡眠时间
使用者始终配置有并发参数。因此,其他消息将在队列中等待,直到消费者完成作业。因此,这个解决方案可以很好地工作,尽管它看起来很尴尬,特别是对于大的时间延迟。
3pvhb19x8#
AMQP协议不支持延迟消息传递,但通过使用Time-To-Live and Expiration和Dead Letter Exchanges扩展,延迟消息传递是可能的。解决方案在此描述。我从该链接复制了以下步骤:
一步一步:
字符串
RabbitMQ repository on GitHub中还有一个延迟消息传递插件。
请注意,有一个名为Celery的解决方案,它通过提供一个名为
apply_async()
的调用API来支持RabbitMQ代理上的延迟任务队列。Celery支持Python、Node和PHP。