springboot:整合rabbitmq之消息确认机制ack

x33g5p2x  于2022-06-16 转载在 Spring  
字(9.3k)|赞(0)|评价(0)|浏览(637)

springboot:整合rabbitmq之消息确认机制ack

一、依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-amqp</artifactId>
  8. </dependency>

二、配置

  1. package com.yolo.springbootrabbitmqproducer.config;
  2. import org.springframework.amqp.core.AcknowledgeMode;
  3. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  4. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  5. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.config.ConfigurableBeanFactory;
  8. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.context.annotation.Scope;
  12. @Configuration
  13. public class RabbitMQConfiguration {
  14. @Bean
  15. public ConnectionFactory rabbitConnectionFactory() {
  16. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  17. String rabbitmqHost = "127.0.0.1";
  18. String rabbitmqPort = "5672";
  19. String rabbitmqUsername = "guest";
  20. String rabbitmqPassword = "guest";
  21. String rabbitmqVirtualHost = "/";
  22. connectionFactory.setHost(rabbitmqHost);
  23. connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
  24. connectionFactory.setUsername(rabbitmqUsername);
  25. connectionFactory.setPassword(rabbitmqPassword);
  26. connectionFactory.setVirtualHost(rabbitmqVirtualHost);
  27. // connectionFactory.setPublisherReturns(true);//开启return模式
  28. // connectionFactory.setPublisherConfirms(true);//开启confirm模式
  29. return connectionFactory;
  30. }
  31. @Bean(name = "rabbitTemplate")
  32. //必须是prototype类型
  33. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  34. public RabbitTemplate rabbitTemplate() {
  35. return new RabbitTemplate(rabbitConnectionFactory());
  36. }
  37. @Bean("customContainerFactory")
  38. public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
  39. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  40. //设置线程数
  41. factory.setConcurrentConsumers(1);
  42. //最大线程数
  43. factory.setMaxConcurrentConsumers(1);
  44. //设置为手动确认MANUAL(手动),AUTO(自动);
  45. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  46. // 设置prefetch
  47. factory.setPrefetchCount(1);
  48. configurer.configure(factory, connectionFactory);
  49. return factory;
  50. }
  51. }

三、消息确认机制(ack)

RabbitMQ默认的消息确认机制是:自动确认的

队列分配消息给监听消费者时,该消息处于未确认状态,不会被删除;当接收到消费者的确认回复才会将消息移除。

发送和监听消息

  1. @RestController
  2. public class AckSenderController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @GetMapping("/send")
  6. public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
  7. for (int i = 1; i <= 10; i++) {
  8. String msg = message + " ..." + i;
  9. System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
  10. rabbitTemplate.convertAndSend("helloWorldExchange","ack", msg);
  11. }
  12. }
  13. private int count1=1;
  14. private int count2=1;
  15. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  16. value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  17. ),
  18. exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  19. key = "ack"
  20. )
  21. })
  22. public void receive(Message message) throws InterruptedException {
  23. Thread.sleep(200);
  24. System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
  25. System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
  26. }
  27. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  28. value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  29. ),
  30. exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  31. key = "ack"
  32. )
  33. })
  34. public void receive2(Message message) throws InterruptedException {
  35. Thread.sleep(1000);
  36. System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
  37. System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
  38. }
  39. }

测试结果

http://localhost:8080/ack

消费者1号、2号分别拿到一条消息进行消费,但没有确认,处于阻塞状态,所以队列不会移除这两条消息,同时设置了prefetch=1,在消费者未确认之前不会重新推送消息给消费者

停止程序,发现2条未确认的消息会回到Ready里面等待重新消费

再次重启,再次消费2条消息,但仍未确认

访问/ack,再次发布消息,消息堆积

四、设置手动ack

修改消费者手动确认

  1. @RestController
  2. public class AckSenderController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. private int count1=1;
  6. private int count2=1;
  7. private int count3 = 1;
  8. @GetMapping("/ack")
  9. public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
  10. for (int i = 1; i <= 10; i++) {
  11. String msg = message + " ..." + i;
  12. System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
  13. rabbitTemplate.convertAndSend("helloWorldExchange","ack", msg);
  14. }
  15. }
  16. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  17. value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  18. ),
  19. exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  20. key = "ack"
  21. )
  22. })
  23. public void receive(Message message, Channel channel) throws InterruptedException, IOException {
  24. Thread.sleep(200);
  25. System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
  26. System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
  27. // 确认消息
  28. // 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增)
  29. // 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签
  30. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  31. }
  32. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  33. value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  34. ),
  35. exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  36. key = "ack"
  37. )
  38. })
  39. public void receive2(Message message,Channel channel,@Headers Map<String, Object> map) throws InterruptedException, IOException {
  40. Thread.sleep(600);
  41. System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
  42. System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
  43. // 确认消息
  44. channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);
  45. }
  46. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  47. value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  48. ),
  49. exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  50. key = "ack"
  51. )
  52. })
  53. public void receive3(Message message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws InterruptedException, IOException {
  54. Thread.sleep(1000);
  55. System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'");
  56. System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++);
  57. // 确认消息
  58. channel.basicAck(deliveryTag, false);
  59. }
  60. }

手动确认测试结果

手动确认通过调用方法实现
basicAck(long deliveryTag, boolean multiple)
deliveryTag:交付标签,相当于消息ID 64位的长整数(从1开始递增)
multiple:false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签

这里发现程序刚刚启动就全部消费完了

继续发布,还是消费完成

修改消费者手动拒绝

  1. @RestController
  2. public class AckSenderController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. private int count1=1;
  6. private int count2=1;
  7. private int count3 = 1;
  8. @GetMapping("/ack")
  9. public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
  10. for (int i = 1; i <= 10; i++) {
  11. String msg = message + " ..." + i;
  12. System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
  13. rabbitTemplate.convertAndSend("helloWorldExchange","ack", msg);
  14. }
  15. }
  16. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  17. value = @Queue(value = "ackQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  18. ),
  19. exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  20. key = "ack"
  21. )
  22. })
  23. public void receive4(
  24. Message message, Channel channel) throws IOException, InterruptedException {
  25. Thread.sleep(200);
  26. System.out.println(" [ 消费者@4号 ] Received ==> '" + new String(message.getBody()) + "'");
  27. System.out.println(" [ 消费者@4号 ] 消息被我拒绝了:" + count1++);
  28. // 拒绝消息方式一
  29. // 第一个参数,交付标签
  30. // 第二个参数,false表示仅拒绝提供的交付标签;true表示批量拒绝所有消息,包括提供的交付标签
  31. // 第三个参数,false表示直接丢弃消息,true表示重新排队
  32. //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  33. // 拒绝消息方式二
  34. // 第一个参数,交付标签
  35. // 第二个参数,false表示直接丢弃消息,true表示重新排队
  36. // 跟basicNack的区别就是始终只拒绝提供的交付标签
  37. channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  38. }
  39. }

手动拒绝测试结果

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
这里是拒绝后,重新进入队列,所以消费的总是第一条消息并且循环不停
停止程序后,队列仍然是10条消息

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
改成false,拒绝后直接丢弃
重启后:

五、总结

  • 未确认:什么也不用写,消息不会移除,重复消费,积攒越来越多
  • 确认:channel.basicAck();确认后,消息从队列中移除
  • 拒绝:channel.basicNack()或channel.basicReject();拒绝后,消息先从队列中移除,然后可以选择重新排队,或者直接丢弃(丢弃还有一种选择,就是加入到死信队列中,用于追踪问题)

相关文章