我需要知道如何直接从我创建的使用者手动确认队列上的消息,并设置5次重试策略,每次尝试增加时间,如第二次尝试5分钟,第三次尝试,第二次尝试失败后10分钟,第四次15分钟...
我有点迷失在兔子的文档,我学到了一点概念,但实际使用仍然是一个谜给我...
我使用的是Symfony 6.1,我的旧的_sound_rabbit_mq.yaml看起来像这样:
old_sound_rabbit_mq:
connections:
default:
host: '%rabbitmqHost%'
port: '%rabbitmqPort%'
user: '%rabbitmqUser%'
password: '%rabbitmqPassword%'
vhost: '%rabbitmqVhost%'
consumers:
upload_file:
connection: default
exchange_options: { name: 'upload_file_exchange', type: direct, durable: true, auto_delete: false }
queue_options: { name: 'upload_file_queue', durable: true, auto_delete: false, arguments: { 'x-max-priority': [ 'I', 20 ] } }
callback: App\Consumer\UploadFileConsumer
qos_options: { prefetch_size: 0, prefetch_count: 1, global: false }
这是我的消费者:
<?php
declare(strict_types=1);
namespace App\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
class UploadFileConsumer implements ConsumerInterface
{
public function execute(AMQPMessage $msg): void
{
try {
// do something with $msg, if all is good then ack the msg and remove from queue
} catch (\Exception $e) {
// keep message in queue, don't ack it, keep it in queue retry 5 times then stop consumer if no success
}
}
}
1条答案
按热度按时间wrrgggsh1#
AMQPMessage为此提供了
ack()
和nack()
方法。https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Message/AMQPMessage.php#L98-L128
所以很可能你想要的是:
虽然我不熟悉如何限制消息重新排队的次数而不修改消息头/有效负载并将其作为新消息重新排队。或者,您可以设置一个TTL值,消息最终将超时退出队列。如果您想检查nack'ed/过期的消息,您也可以创建一个死信交换。[只要确保将其清除,否则您将遇到新的问题]
如果我不得不在“重新排队X次”中拼凑,我会建议一个像Redis这样的内置TTL的缓存,关键字是消息ID,值是重试次数。
编辑:
按优先级降序列出“任务A必须在任务B开始之前完成”的一些工作流:
并且如果您的队列没有定义TTL,请始终注意“无限重新排队,”因为消息将继续累积,您的使用者将不断地处理可能永远无法成功完成的任务。