我使用的是org.springframework.boot:spring-boot-starter-amqp:2.6.6。根据文档,我设置了@RabbitListener
-我使用的是SimpleRabbitListenerContainerFactory
,配置如下所示:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ObjectMapper om) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConcurrentConsumers(rabbitProperties.getUpdater().getConcurrentConsumers());
factory.setMaxConcurrentConsumers(rabbitProperties.getUpdater().getMaxConcurrentConsumers());
factory.setMessageConverter(new Jackson2JsonMessageConverter(om));
factory.setAutoStartup(rabbitProperties.getUpdater().getAutoStartup());
factory.setDefaultRequeueRejected(false);
return factory;
}
服务的逻辑是接收来自rabbitmq的消息,通过rest API联系外部服务(使用rest模板)并根据响应结果将一些信息放入数据库中(使用springdatajpa)。服务成功地实现了它,但是在测试过程中遇到了一些问题,如果在那些抛出堆栈的工作过程中发生任何异常,消息没有被发送到配置的dlq,而是简单地挂在代理中作为unacked.你能告诉我你怎么能告诉spring amqp如果发生任何错误,你需要把消息重定向到dlq吗?
侦听器本身看起来像这样:
@RabbitListener(
queues = {"${rabbit.updater.consuming.queue.name}"},
containerFactory = "rabbitListenerContainerFactory"
)
@Override
public void listen(
@Valid @Payload MessageDTO message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag
) {
log.debug(DebugMessagesConstants.RECEIVED_MESSAGE_FROM_QUEUE, message, deliveryTag);
messageUpdater.process(message);
channel.basicAck(deliveryTag, false);
log.debug(DebugMessagesConstants.PROCESSED_MESSAGE_FROM_QUEUE, message, deliveryTag);
}
在兔子管理中,它看起来像这样:enter image description here和unacked将挂起,直到队列使用应用程序停止
1条答案
按热度按时间mhd8tkvw1#
请参阅错误处理文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/#annotation-error-handling。
因此,您不必执行
AcknowledgeMode.MANUAL
,而是依赖死信交换配置来处理那些在出错时被拒绝的消息。或者尝试在
messageUpdater.process(message);
异常的情况下使用this.channel.basicNack(deliveryTag, false, false)
...