spring amqp(rabbitmq),发生异常时发送给DLQ

f4t66c6m  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(172)

我使用的是org.springframework.boot:spring-boot-starter-amqp:2.6.6。根据文档,我设置了@RabbitListener-我使用的是SimpleRabbitListenerContainerFactory,配置如下所示:

@Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ObjectMapper om) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(rabbitProperties.getUpdater().getConcurrentConsumers());
        factory.setMaxConcurrentConsumers(rabbitProperties.getUpdater().getMaxConcurrentConsumers());
        factory.setMessageConverter(new Jackson2JsonMessageConverter(om));
        factory.setAutoStartup(rabbitProperties.getUpdater().getAutoStartup());
        factory.setDefaultRequeueRejected(false);
        return factory;
    }

服务的逻辑是接收来自rabbitmq的消息,通过rest API联系外部服务(使用rest模板)并根据响应结果将一些信息放入数据库中(使用springdatajpa)。服务成功地实现了它,但是在测试过程中遇到了一些问题,如果在那些抛出堆栈的工作过程中发生任何异常,消息没有被发送到配置的dlq,而是简单地挂在代理中作为unacked.你能告诉我你怎么能告诉spring amqp如果发生任何错误,你需要把消息重定向到dlq吗?
侦听器本身看起来像这样:

@RabbitListener(
            queues = {"${rabbit.updater.consuming.queue.name}"},
            containerFactory = "rabbitListenerContainerFactory"
    )
    @Override
    public void listen(
            @Valid @Payload MessageDTO message,
            Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag
    ) {

        log.debug(DebugMessagesConstants.RECEIVED_MESSAGE_FROM_QUEUE, message, deliveryTag);

        messageUpdater.process(message);
        channel.basicAck(deliveryTag, false);

        log.debug(DebugMessagesConstants.PROCESSED_MESSAGE_FROM_QUEUE, message, deliveryTag);

    }

在兔子管理中,它看起来像这样:enter image description here和unacked将挂起,直到队列使用应用程序停止

mhd8tkvw

mhd8tkvw1#

请参阅错误处理文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/#annotation-error-handling。
因此,您不必执行AcknowledgeMode.MANUAL,而是依赖死信交换配置来处理那些在出错时被拒绝的消息。
或者尝试在messageUpdater.process(message);异常的情况下使用this.channel.basicNack(deliveryTag, false, false) ...

相关问题