springboot:整合rabbitmq之死信队列

x33g5p2x  于2022-06-16 转载在 Spring  
字(6.1k)|赞(0)|评价(0)|浏览(586)

springboot:整合rabbitmq之死信队列

一、项目准备

依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-amqp</artifactId>
  8. </dependency>

配置类

  1. @Configuration
  2. public class RabbitMQConfiguration {
  3. @Bean
  4. public ConnectionFactory rabbitConnectionFactory() {
  5. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  6. String rabbitmqHost = "127.0.0.1";
  7. String rabbitmqPort = "5672";
  8. String rabbitmqUsername = "guest";
  9. String rabbitmqPassword = "guest";
  10. String rabbitmqVirtualHost = "/";
  11. connectionFactory.setHost(rabbitmqHost);
  12. connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
  13. connectionFactory.setUsername(rabbitmqUsername);
  14. connectionFactory.setPassword(rabbitmqPassword);
  15. connectionFactory.setVirtualHost(rabbitmqVirtualHost);
  16. // connectionFactory.setPublisherReturns(true);//开启return模式
  17. // connectionFactory.setPublisherConfirms(true);//开启confirm模式
  18. return connectionFactory;
  19. }
  20. @Bean(name = "rabbitTemplate")
  21. //必须是prototype类型
  22. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  23. public RabbitTemplate rabbitTemplate() {
  24. return new RabbitTemplate(rabbitConnectionFactory());
  25. }
  26. @Bean("customContainerFactory")
  27. public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
  28. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  29. //设置线程数
  30. factory.setConcurrentConsumers(1);
  31. //最大线程数
  32. factory.setMaxConcurrentConsumers(1);
  33. // //设置为手动确认MANUAL(手动),AUTO(自动);
  34. // factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  35. // 设置prefetch
  36. factory.setPrefetchCount(1);
  37. configurer.configure(factory, connectionFactory);
  38. return factory;
  39. }
  40. }

yaml配置文件

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. retry:
  6. enabled: true
  7. max-attempts: 5 # 重试次数
  8. max-interval: 10000 # 重试最大间隔时间
  9. initial-interval: 2000 # 重试初始间隔时间
  10. multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

二、死信队列介绍

创建一个普通队列时,通过添加配置绑定另一个交换机(死信交换机),在普通队列发生异常时,消息就通过死信交换机转发到绑定它的队列里,这个绑定死信交换机的队列就是死信队列

三、案例

创建生产者和消费者

  1. @Slf4j
  2. @RestController
  3. public class DLController {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. private int count = 1;
  7. @GetMapping("/dl")
  8. public void dl() {
  9. String message = "Hello World!";
  10. log.info(" [ 生产者 ] Sent ==> '" + message + "'");
  11. rabbitTemplate.convertAndSend("normal_exchange", "normal_key", message);
  12. }
  13. // 监听 normal_queue 正常队列
  14. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  15. value = @Queue(value = "normal_queue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  16. ,arguments = {
  17. @Argument(name = "x-dead-letter-exchange",value = "dlx_exchange"), //指定一下死信交换机
  18. @Argument(name = "x-dead-letter-routing-key",value = "dead_key"), //指定死信交换机的路由key
  19. //@Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") //指定队列的过期时间,type需要指定为Long,否则会抛异常
  20. //,@Argument(name = "x-max-length",value = "3") //指定队列最大长度,超过会被投入死信,至于type是否需要指定为Long,本人没试过
  21. }
  22. ),
  23. exchange = @Exchange(value = "normal_exchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  24. key = "normal_key"
  25. )
  26. })
  27. public void normal(Message message) {
  28. log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
  29. log.info("当前执行次数:{}", count++);
  30. int i = 1 / 0;
  31. log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody()));
  32. }
  33. // 监听死信队列
  34. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  35. value = @Queue(value = "dlx_queue"),
  36. exchange = @Exchange(value = "dlx_exchange"),//Exchang的默认类型就是direct,所以type可以不写
  37. key = "dead_key"
  38. )
  39. })
  40. public void dl(Message message) {
  41. log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
  42. }
  43. }

测试结果

服务器上normal-queue有DLX、DLK标识,说明该队列绑定了死信交换机和路由键;
重试5次之后,就将消息转发给死信队列

修改消费者、手动确认

  1. @Slf4j
  2. @RestController
  3. public class DLController {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. private int count = 1;
  7. @GetMapping("/dl")
  8. public void dl() {
  9. String message = "Hello World!";
  10. log.info(" [ 生产者 ] Sent ==> '" + message + "'");
  11. rabbitTemplate.convertAndSend("normal_exchange", "normal_key", message);
  12. }
  13. // 监听 normal_queue 正常队列
  14. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  15. value = @Queue(value = "normal_queue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  16. ,arguments = {
  17. @Argument(name = "x-dead-letter-exchange",value = "dlx_exchange"), //指定一下死信交换机
  18. @Argument(name = "x-dead-letter-routing-key",value = "dead_key"), //指定死信交换机的路由key
  19. //@Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") //指定队列的过期时间,type需要指定为Long,否则会抛异常
  20. //,@Argument(name = "x-max-length",value = "3") //指定队列最大长度,超过会被投入死信,至于type是否需要指定为Long,本人没试过
  21. }
  22. ),
  23. exchange = @Exchange(value = "normal_exchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  24. key = "normal_key"
  25. )
  26. })
  27. public void normal(Message message, Channel channel) throws IOException {
  28. log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
  29. log.info("当前执行次数:{}", count++);
  30. try {
  31. // 制造异常
  32. int i = 1 / 0;
  33. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  34. log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody()));
  35. } catch (Exception e) {
  36. log.info("捕获异常,不会启动重试机制,异常消息直接转发到死信队列");
  37. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  38. }
  39. }
  40. // 监听死信队列
  41. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  42. value = @Queue(value = "dlx_queue"),
  43. exchange = @Exchange(value = "dlx_exchange"),//Exchang的默认类型就是direct,所以type可以不写
  44. key = "dead_key"
  45. )
  46. })
  47. public void dl(Message message) {
  48. log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
  49. }
  50. }

修改配置类

或者修改yaml文件,二选一

重启测试

四、总结

  • 手动确认并且主动捕获了异常是不会触发重试机制,异常消息直接转发到死信队列
  • 死信队列是针对某个队列发生异常时进行处理
  • 重试机制中的RepublishMessageRecoverer是对所有队列发生异常时进行处理,并且优先于死信队列

相关文章