如何在RabbitMQ中手动确认消息?

plicqrtu  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(420)

我需要知道如何直接从我创建的使用者手动确认队列上的消息,并设置5次重试策略,每次尝试增加时间,如第二次尝试5分钟,第三次尝试,第二次尝试失败后10分钟,第四次15分钟...
我有点迷失在兔子的文档,我学到了一点概念,但实际使用仍然是一个谜给我...
我使用的是Symfony 6.1,我的旧的_sound_rabbit_mq.yaml看起来像这样:

old_sound_rabbit_mq:
    connections:
        default:
            host: '%rabbitmqHost%'
            port: '%rabbitmqPort%'
            user: '%rabbitmqUser%'
            password: '%rabbitmqPassword%'
            vhost: '%rabbitmqVhost%'
    consumers:
        upload_file:
            connection: default
            exchange_options: { name: 'upload_file_exchange', type: direct, durable: true, auto_delete: false }
            queue_options: { name: 'upload_file_queue', durable: true, auto_delete: false, arguments: { 'x-max-priority': [ 'I', 20 ] } }
            callback: App\Consumer\UploadFileConsumer
            qos_options: { prefetch_size: 0, prefetch_count: 1, global: false }

这是我的消费者:

<?php

declare(strict_types=1);

namespace App\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class UploadFileConsumer implements ConsumerInterface
{
    public function execute(AMQPMessage $msg): void
    {
        try {
            // do something with $msg, if all is good then ack the msg and remove from queue
        } catch (\Exception $e) {
            // keep message in queue, don't ack it, keep it in queue retry 5 times then stop consumer if no success
        }
    }
}
wrrgggsh

wrrgggsh1#

AMQPMessage为此提供了ack()nack()方法。
https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Message/AMQPMessage.php#L98-L128
所以很可能你想要的是:

<?php

declare(strict_types=1);

namespace App\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class UploadFileConsumer implements ConsumerInterface
{
    public function execute(AMQPMessage $msg): void
    {
        try {
            // do something with $msg
            $msg->ack();
        } catch (\Exception $e) {
            // keep message in queue, don't ack it, keep it in queue retry
            $msg->nack(true);
        }
    }
}

虽然我不熟悉如何限制消息重新排队的次数而不修改消息头/有效负载并将其作为新消息重新排队。或者,您可以设置一个TTL值,消息最终将超时退出队列。如果您想检查nack'ed/过期的消息,您也可以创建一个死信交换。[只要确保将其清除,否则您将遇到新的问题]
如果我不得不在“重新排队X次”中拼凑,我会建议一个像Redis这样的内置TTL的缓存,关键字是消息ID,值是重试次数。

编辑:

按优先级降序列出“任务A必须在任务B开始之前完成”的一些工作流:

  • 如果任务A和任务B不能彼此独立,则将任务A和任务B合并为一个任务,因为它们不是独立的。
  • 如果没有A,任务B无法发生,则让B以同步/RPC方式调用A。
  • 创建以同步/RPC方式调用A和B的新异步任务C。
  • 任务A以将任务B提交到队列结束。[这仍然有“A和B”实际上是一个工作单元的味道]
  • 从外部跟踪任务状态[例如:如果任务A还没有完成,则使任务B nack/requeue。

并且如果您的队列没有定义TTL,请始终注意“无限重新排队,”因为消息将继续累积,您的使用者将不断地处理可能永远无法成功完成的任务。

相关问题