springboot:整合rabbitmq之延迟队列

x33g5p2x  于2022-06-16 转载在 Spring  
字(7.0k)|赞(0)|评价(0)|浏览(661)

springboot:整合rabbitmq之延迟队列

一、项目准备

依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-amqp</artifactId>
  8. </dependency>

配置类

  1. @Configuration
  2. public class RabbitMQConfiguration {
  3. @Bean
  4. public ConnectionFactory rabbitConnectionFactory() {
  5. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  6. String rabbitmqHost = "127.0.0.1";
  7. String rabbitmqPort = "5672";
  8. String rabbitmqUsername = "guest";
  9. String rabbitmqPassword = "guest";
  10. String rabbitmqVirtualHost = "/";
  11. connectionFactory.setHost(rabbitmqHost);
  12. connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
  13. connectionFactory.setUsername(rabbitmqUsername);
  14. connectionFactory.setPassword(rabbitmqPassword);
  15. connectionFactory.setVirtualHost(rabbitmqVirtualHost);
  16. // connectionFactory.setPublisherReturns(true);//开启return模式
  17. // connectionFactory.setPublisherConfirms(true);//开启confirm模式
  18. return connectionFactory;
  19. }
  20. @Bean(name = "rabbitTemplate")
  21. //必须是prototype类型
  22. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  23. public RabbitTemplate rabbitTemplate() {
  24. return new RabbitTemplate(rabbitConnectionFactory());
  25. }
  26. @Bean("customContainerFactory")
  27. public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
  28. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  29. //设置线程数
  30. factory.setConcurrentConsumers(1);
  31. //最大线程数
  32. factory.setMaxConcurrentConsumers(1);
  33. // //设置为手动确认MANUAL(手动),AUTO(自动);
  34. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  35. // 设置prefetch
  36. factory.setPrefetchCount(1);
  37. configurer.configure(factory, connectionFactory);
  38. return factory;
  39. }
  40. }

二、延迟队列

一般来说,发布消息之后,会被交换机接收并转发给对应的队列,队列分配给消费者处理,这个过程很快秒级处理;但有时候我们希望发布完消息后,在指定的时间之后再去处理消息,这个时候就需要使用到延时队列;
虽说是延时队列,但其实也只是对死信队列的一种扩展应用罢了。

三、案例(对queue所有消息进行设置)

创建生产者和消费者

首先还是得创建普通队列,添加参数绑定死信队列同时设置消息过期时间,生产者发布消息到普通队列,而普通队列没有任何消费者来消费,那么消息在普通队列中存活到设定过期时间就被转发到死信队列,由死信队列的消费者消费消息,以此实现延时功能

  1. @Slf4j
  2. @RestController
  3. public class DL2Controller {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. private int count = 1;
  7. @GetMapping("/dl2")
  8. public void dl() {
  9. String message = "Hello World222222222!";
  10. log.info(" [ 生产者 ] Sent ==> '" + message + "'");
  11. rabbitTemplate.convertAndSend("normal_exchange2", "normal_key2", message);
  12. }
  13. // 监听 normal_queue 正常队列
  14. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  15. value = @Queue(value = "normal_queue2",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  16. ,arguments = {
  17. @Argument(name = "x-dead-letter-exchange",value = "dlx_exchange2"), //指定一下死信交换机
  18. @Argument(name = "x-dead-letter-routing-key",value = "dead_key2"), //指定死信交换机的路由key
  19. @Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") //指定队列的过期时间,type需要指定为Long,否则会抛异常
  20. //,@Argument(name = "x-max-length",value = "3") //指定队列最大长度,超过会被投入死信,至于type是否需要指定为Long,本人没试过
  21. }
  22. ),
  23. exchange = @Exchange(value = "normal_exchange2",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  24. key = "normal_key2"
  25. )
  26. })
  27. public void normal(Message message, Channel channel) throws IOException, InterruptedException {
  28. log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
  29. /*
  30. * deliveryTag:该消息的index
  31. * multiple: ture确认本条消息以及之前没有确认的消息(批量),false仅确认本条消息
  32. * requeue: true该条消息重新返回MQ queue,MQ broker将会重新发送该条消息
  33. */
  34. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  35. }
  36. // 监听死信队列
  37. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  38. value = @Queue(value = "dlx_queue2"),
  39. exchange = @Exchange(value = "dlx_exchange2"),//Exchang的默认类型就是direct,所以type可以不写
  40. key = "dead_key2"
  41. )
  42. })
  43. public void dl(Message message, Channel channel) throws IOException {
  44. log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
  45. //打印完直接丢弃消息
  46. channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
  47. }
  48. }

测试结果

这里我们开启手动ack,然后在普通队列中拒绝ack并重新返回队列,当消息在队列时间超过3s,就会进入延迟队列

四、案例(对queue每一条消息消息进行设置)

创建生产者和消费者

  1. package com.yolo.springbootrabbitmqproducer.controller;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.ExchangeTypes;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.core.MessageProperties;
  7. import org.springframework.amqp.rabbit.annotation.*;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.web.bind.annotation.GetMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import java.io.IOException;
  13. import java.nio.charset.StandardCharsets;
  14. @Slf4j
  15. @RestController
  16. public class DL3Controller {
  17. @Autowired
  18. private RabbitTemplate rabbitTemplate;
  19. @GetMapping("/dl3")
  20. public void dl() {
  21. String s = "Hello World! 3333333333";
  22. log.info(" [ 生产者 ] Sent ==> '" + s + "'");
  23. //设置过期时间
  24. MessageProperties messageProperties = new MessageProperties();
  25. messageProperties.setExpiration("12000");
  26. Message message = new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties);
  27. rabbitTemplate.convertAndSend("normal_exchange3", "normal_key3", message);
  28. }
  29. // 监听 normal_queue 正常队列
  30. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  31. value = @Queue(value = "normal_queue3",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  32. ,arguments = {
  33. @Argument(name = "x-dead-letter-exchange",value = "dlx_exchange3"), //指定一下死信交换机
  34. @Argument(name = "x-dead-letter-routing-key",value = "dead_key3"), //指定死信交换机的路由key
  35. //@Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") //指定队列的过期时间,type需要指定为Long,否则会抛异常
  36. //,@Argument(name = "x-max-length",value = "3") //指定队列最大长度,超过会被投入死信,至于type是否需要指定为Long,本人没试过
  37. }
  38. ),
  39. exchange = @Exchange(value = "normal_exchange3",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  40. key = "normal_key3"
  41. )
  42. })
  43. public void normal(Message message, Channel channel) throws IOException, InterruptedException {
  44. /*
  45. * deliveryTag:该消息的index
  46. * multiple: ture确认本条消息以及之前没有确认的消息(批量),false仅确认本条消息
  47. * requeue: true该条消息重新返回MQ queue,MQ broker将会重新发送该条消息
  48. */
  49. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  50. }
  51. // 监听死信队列
  52. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  53. value = @Queue(value = "dlx_queue3"),
  54. exchange = @Exchange(value = "dlx_exchange3"),//Exchang的默认类型就是direct,所以type可以不写
  55. key = "dead_key3"
  56. )
  57. })
  58. public void dl(Message message, Channel channel) throws IOException {
  59. log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
  60. //打印完直接丢弃消息
  61. channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
  62. }
  63. }

重启测试

相关文章