springboot:整合rabbitmq之重试机制

x33g5p2x  于2022-06-16 转载在 Spring  
字(5.2k)|赞(0)|评价(0)|浏览(692)

springboot:整合rabbitmq之重试机制

一、项目准备

依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-amqp</artifactId>
  8. </dependency>

配置类

  1. @Configuration
  2. public class RabbitMQConfiguration {
  3. @Bean
  4. public ConnectionFactory rabbitConnectionFactory() {
  5. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  6. String rabbitmqHost = "127.0.0.1";
  7. String rabbitmqPort = "5672";
  8. String rabbitmqUsername = "guest";
  9. String rabbitmqPassword = "guest";
  10. String rabbitmqVirtualHost = "/";
  11. connectionFactory.setHost(rabbitmqHost);
  12. connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
  13. connectionFactory.setUsername(rabbitmqUsername);
  14. connectionFactory.setPassword(rabbitmqPassword);
  15. connectionFactory.setVirtualHost(rabbitmqVirtualHost);
  16. // connectionFactory.setPublisherReturns(true);//开启return模式
  17. // connectionFactory.setPublisherConfirms(true);//开启confirm模式
  18. return connectionFactory;
  19. }
  20. @Bean(name = "rabbitTemplate")
  21. //必须是prototype类型
  22. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  23. public RabbitTemplate rabbitTemplate() {
  24. return new RabbitTemplate(rabbitConnectionFactory());
  25. }
  26. @Bean("customContainerFactory")
  27. public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
  28. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  29. //设置线程数
  30. factory.setConcurrentConsumers(1);
  31. //最大线程数
  32. factory.setMaxConcurrentConsumers(1);
  33. // //设置为手动确认MANUAL(手动),AUTO(自动);
  34. // factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  35. // 设置prefetch
  36. factory.setPrefetchCount(1);
  37. configurer.configure(factory, connectionFactory);
  38. return factory;
  39. }
  40. }

二、案例重现

  1. @Slf4j
  2. @RestController
  3. public class RetryController {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @GetMapping("/retry")
  7. public void retry() {
  8. String message = "Hello World !";
  9. rabbitTemplate.convertAndSend("retry_exchange", "retry_key", message);
  10. System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
  11. }
  12. private int count = 1;
  13. @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
  14. value = @Queue(value = "retry_a", declare = "true"),
  15. exchange = @Exchange(value = "retry_exchange", type = ExchangeTypes.DIRECT, durable = "true"),
  16. key = "retry_key"
  17. )
  18. })
  19. public void retry(Message message) {
  20. log.info("当前执行次数:{}", count++);
  21. log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
  22. // 制造异常
  23. int i = 1 / 0;
  24. log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody()));
  25. }
  26. }

启动测试:

无限循环报错
停止后,消息重回Ready状态

三、实现消息重试

实现重试

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. retry:
  6. enabled: true
  7. max-attempts: 5 # 重试次数
  8. max-interval: 10000 # 重试最大间隔时间
  9. initial-interval: 2000 # 重试初始间隔时间
  10. multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

重启测试

第一次执行时间2s,第二次4s,第三次8s,第四次16s,第五次由于设置了最大间隔为10s,所有变成了10s

最后查看retry_a队列,消息没有了,也就是说重试五次失败之后就会移除该消息

移除操作是由日志中的这个类处理:RejectAndDontRequeueRecoverer(拒绝和不要重新排队)

对重试失败的消息重新排队

使用下 ImmediateRequeueMessageRecoverer 重新排队在RabbitMQConfiguration中配置

  1. @Bean
  2. public MessageRecoverer messageRecoverer() {
  3. return new ImmediateRequeueMessageRecoverer();
  4. }

重启运行:

可以看出:重试5次之后,返回队列,然后再重试5次,周而复始直到不抛出异常为止,这样还是会影响后续的消息消费

把重试失败消息放入重试失败队列

接着使用 RepublishMessageRecoverer 重新发布在RabbitMQConfiguration中配置

  1. public static final String RETRY_FAILURE_KEY = "retry.failure.key";
  2. public static final String RETRY_EXCHANGE = "retry_exchange";
  3. //@Bean 这个注释掉了
  4. public MessageRecoverer messageRecoverer() {
  5. return new ImmediateRequeueMessageRecoverer();
  6. }
  7. @Bean
  8. public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
  9. // 需要配置交换机和绑定键
  10. return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, RETRY_FAILURE_KEY);
  11. }

创建重试失败消息监听

  1. @Slf4j
  2. @RestController
  3. public class RetryController {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @GetMapping("/retry")
  7. public void retry() {
  8. String message = "Hello World !";
  9. rabbitTemplate.convertAndSend("retry_exchange", "retry_key", message);
  10. System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
  11. }
  12. private int count = 1;
  13. @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
  14. value = @Queue(value = "retry_a", declare = "true"),
  15. exchange = @Exchange(value = "retry_exchange", type = ExchangeTypes.DIRECT, durable = "true"),
  16. key = "retry_key"
  17. )
  18. })
  19. public void retry(Message message) {
  20. log.info("当前执行次数:{}", count++);
  21. log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
  22. // 制造异常
  23. int i = 1 / 0;
  24. log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody()));
  25. }
  26. @RabbitListener(containerFactory = "customContainerFactory",bindings = @QueueBinding(
  27. value = @Queue(value = "retry_failure_queue"),
  28. exchange = @Exchange(value = "retry_exchange"),
  29. key = "retry.failure.key"
  30. ))
  31. public void retryFailure(Message message) {
  32. log.info(" [ 消费者@重试失败号 ] 接收到消息 ==> '" + new String(message.getBody()));
  33. }
  34. }

重启,运行结果:

重试5次之后,将消息 Republishing failed message to exchange ‘retry.exchange’ with routing key retry-key 转发到重试失败队列,由重试失败消费者消费

开发者涨薪指南

48位大咖的思考法则、工作方式、逻辑体系

相关文章