我有一个简单的Sping Boot 应用程序,其中我对RabbitMQ进行了以下设置(spring-boot-starter-amqp版本为2.7.0):
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: my_host
username: admin
password: password
template:
retry:
enabled: true
initial-interval: 3s
max-interval: 10s
multiplier: 2
max-attempts: 3
listener:
simple:
retry:
enabled: true
initial-interval: 3s
max-interval: 10s
multiplier: 2
max-attempts: 3
SimpleRabbitListenerContainerFactory
配置有MessagePostProcessor
:
@Bean
public SimpleRabbitListenerContainerFactory myInterceptContainerFactory(final SimpleRabbitListenerContainerFactoryConfigurer configurer,
final ConnectionFactory connectionFactory,
final MyMessagePostProcessor myMessagePostProcessor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory() {
@Override
protected void initializeContainer(SimpleMessageListenerContainer instance, RabbitListenerEndpoint endpoint) {
instance.setAfterReceivePostProcessors(myMessagePostProcessor);
super.initializeContainer(instance, endpoint);
}
};
((CachingConnectionFactory) connectionFactory).setRequestedHeartBeat(60);
configurer.configure(factory, connectionFactory);
return factory;
}
MessagePostProcessor
非常简单,如果头文件中的版本错误,它会抛出一个AmqpRejectAndDontRequeueException
:
@Component
public class MyMessagePostProcessor implements MessagePostProcessor {
private static final Logger LOGGER = LogManager.getLogger(MyMessagePostProcessor.class);
@Override
public Message postProcessMessage(Message message) throws AmqpException {
if (!"1.0".equalsIgnoreCase(message.getMessageProperties().getHeader("version"))) {
LOGGER.error("wrong version");
throw new AmqpRejectAndDontRequeueException("wrong version");
}
return message;
}
}
当我发送版本错误的邮件时,它会根据日志正常工作:
MyApp: 2022-07-05 15:49:33,675 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.p.MyMessagePostProcessor:22 - wrong version
MyApp: 2022-07-05 15:49:33,676 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] o.s.a.r.l.ConditionalRejectingErrorHandler:169 - Execution of Rabbit message listener failed.
org.springframework.amqp.AmqpRejectAndDontRequeueException: wrong version
at com.my.app.queue.postprocess.MyMessagePostProcessor.postProcessMessage(MyMessagePostProcessor.java:23) ~[classes/:1.0.0]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1544) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
我的使用者只执行一个错误日志,并立即抛出AmqpRejectAndDontRequeueException
,以查看消息是否在不重试的情况下进入死信队列:
@Component
public class MyMessageConsumer {
private static final Logger LOGGER = LogManager.getLogger(MyMessageConsumer.class);
@RabbitListener(queues = "my.queue", containerFactory = "myInterceptContainerFactory")
public void consumerMessage() {
LOGGER.error("error in consuming message");
throw new AmqpRejectAndDontRequeueException("over");
}
}
但根据日志,可以清楚地看到它重试了3次(因为我的错误日志打印了3次),只有在此之后,消息才被传递到死信队列中:
MyApp: 2022-07-05 15:50:32,182 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.c.MyMessageConsumer:23 - error in consuming message
MyApp: 2022-07-05 15:50:46,603 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.c.MyMessageConsumer:23 - error in consuming message
MyApp: 2022-07-05 15:50:52,617 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.c.MyMessageConsumer:23 - error in consuming message
MyApp: 2022-07-05 15:50:52,618 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer:74 - Retries exhausted for message (Body:'[B@4060893b(byte[75])' MessageProperties [headers={version=1.0, requestor=MyApp}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=my.queue, deliveryTag=2, consumerTag=amq.ctag-5Xx6EwwjVokRV6wD9xL8Og, consumerQueue=my.queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid()' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:271) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122) ~[spring-retry-1.3.3.jar:?]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.amqp.rabbit.listener.$Proxy187.invokeListener(Unknown Source) ~[?:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1564) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: over
at com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid(MyMessageConsumer.java:24) ~[classes/:1.0.0]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:262) ~[spring-rabbit-2.4.5.jar:2.4.5]
... 27 more
MyApp: 2022-07-05 15:50:52,620 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] o.s.a.r.l.ConditionalRejectingErrorHandler:169 - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:77) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:157) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122) ~[spring-retry-1.3.3.jar:?]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.amqp.rabbit.listener.$Proxy187.invokeListener(Unknown Source) ~[?:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1564) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException
... 19 more
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid()' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:271) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:?]
... 14 more
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: over
at com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid(MyMessageConsumer.java:24) ~[classes/:1.0.0]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:262) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:?]
... 14 more
但我想将消息传递到死信队列中,而不重试3次。
我猜ListenerExecutionFailedException: Retry Policy Exhausted
有问题,但我不明白为什么我的AmqpRejectAndDontRequeueException
被 Package 在这个异常中,因为我的MessagePostProcessor
没有抛出ListenerExecutionFailedException
,只有我在代码中抛出的那个。
我错过了什么?
2条答案
按热度按时间llycmphe1#
正如@加里Russel所建议的,我修改了重试策略如下:
并且只配置这个新bean,一切都按预期工作!
mutmk8jj2#
在重试次数耗尽之前,容器对异常一无所知。
您需要自定义重试策略,以便不重试该异常。
当前无法使用应用程序属性实现此操作;您需要将重试拦截器配置为bean并将其注入容器工厂。
如果你需要帮助,我可以添加一个例子。