springboot:整合rabbitmq之快速入门

x33g5p2x  于2022-06-16 转载在 Spring  
字(6.9k)|赞(0)|评价(0)|浏览(686)

springboot:整合rabbitmq之快速入门

一、依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-amqp</artifactId>
  8. </dependency>

二、配置类

  1. package com.yolo.springbootrabbitmqproducer.config;
  2. import org.springframework.amqp.core.AcknowledgeMode;
  3. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  4. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  5. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.config.ConfigurableBeanFactory;
  8. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.context.annotation.Scope;
  12. @Configuration
  13. public class RabbitMQConfiguration {
  14. @Bean
  15. public ConnectionFactory rabbitConnectionFactory() {
  16. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  17. String rabbitmqHost = "127.0.0.1";
  18. String rabbitmqPort = "5672";
  19. String rabbitmqUsername = "guest";
  20. String rabbitmqPassword = "guest";
  21. String rabbitmqVirtualHost = "/";
  22. connectionFactory.setHost(rabbitmqHost);
  23. connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
  24. connectionFactory.setUsername(rabbitmqUsername);
  25. connectionFactory.setPassword(rabbitmqPassword);
  26. connectionFactory.setVirtualHost(rabbitmqVirtualHost);
  27. // connectionFactory.setPublisherReturns(true);//开启return模式
  28. // connectionFactory.setPublisherConfirms(true);//开启confirm模式
  29. return connectionFactory;
  30. }
  31. @Bean(name = "rabbitTemplate")
  32. //必须是prototype类型
  33. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  34. public RabbitTemplate rabbitTemplate() {
  35. return new RabbitTemplate(rabbitConnectionFactory());
  36. }
  37. @Bean("customContainerFactory")
  38. public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
  39. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  40. factory.setConcurrentConsumers(1); //设置线程数
  41. factory.setMaxConcurrentConsumers(1); //最大线程数
  42. // factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置为手动确认
  43. configurer.configure(factory, connectionFactory);
  44. return factory;
  45. }
  46. }

三、基本消息模型

消息发布和监听

  1. @RestController
  2. public class ProducerTestOneController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @GetMapping("/send")
  6. public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
  7. for (int i = 1; i <= 10; i++) {
  8. String msg = message + " ..." + i;
  9. System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
  10. rabbitTemplate.convertAndSend("helloWorldExchange","helloWorld", msg);
  11. }
  12. }
  13. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  14. value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  15. ),
  16. exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  17. key = "helloWorld"
  18. )
  19. })
  20. public void receive(String message) {
  21. System.out.println(" [ 消费者 ] Received ==> '" + message + "'");
  22. }
  23. }

启动项目查看rabbitmq可视化界面

Ready:表示待消费数量;队列中拥有可以被消费者消费的消息数量。
Unacked:表示待确认数量;队列分配消息给消费者时,给该条消息一个待确认状态,当消费者确认消息之后,队列才会移除该条消息。
Total:表示待消费数和待确认数的总和

发送消息测试

访问:http://localhost:8080/send

这里采用的是自动ack机制

新增一个接受者

  1. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  2. value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  3. ),
  4. exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  5. key = "helloWorld"
  6. )
  7. })
  8. public void receive2(Message message) {
  9. System.out.println(" [ 消费者@2 ] Received ==> '" + new String(message.getBody()) + "'");
  10. }

重新启动,访问:localhost:8080/send

可以看到消息被平均消费了

四、竞争消费者模式

队列的消息分配方式默认是平均分配,即第一条消息分配给一个消息者,第二条消息就分配给另一个消息者,以此类推…

上面示例有2个消费者监听,由于只是简单的打印语句,所以看不出有什么问题。
我进行修改一下,通过设置线程休眠时间来表示消费者处理消费的任务时间

  1. @RestController
  2. public class ProducerTestOneController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @GetMapping("/send")
  6. public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
  7. for (int i = 1; i <= 10; i++) {
  8. String msg = message + " ..." + i;
  9. System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
  10. rabbitTemplate.convertAndSend("helloWorldExchange","helloWorld", msg);
  11. }
  12. }
  13. private int count1=1;
  14. private int count2=1;
  15. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  16. value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  17. ),
  18. exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  19. key = "helloWorld"
  20. )
  21. })
  22. public void receive(Message message) throws InterruptedException {
  23. Thread.sleep(200);
  24. System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
  25. System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
  26. }
  27. @RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
  28. value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
  29. ),
  30. exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
  31. key = "helloWorld"
  32. )
  33. })
  34. public void receive2(Message message) throws InterruptedException {
  35. Thread.sleep(1000);
  36. System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
  37. System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
  38. }
  39. }

现在就能很明显的看出,消费者1号很快地处理完消息后就处于空闲状态;而消费者2号却一直很忙碌。当消息数量成千上万的时候,由消费者2号处理的消息会堆积很多,达不到时效性。

针对这种问题,rabbitmq提供了一种解决方案。
设置prefetch参数=1,实现原理是:队列只会分配一条消息给对应的监听消费者,收到消费者的确认回复之后才会重新分配另一条消息。

方式一(局部)

这里需要每一个接受者指定containerFactory

  1. @Bean("customContainerFactory")
  2. public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
  3. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  4. factory.setConcurrentConsumers(1); //设置线程数
  5. factory.setMaxConcurrentConsumers(1); //最大线程数
  6. // factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置为手动确认
  7. // 设置prefetch
  8. factory.setPrefetchCount(1);
  9. configurer.configure(factory, connectionFactory);
  10. return factory;
  11. }

1号处理了8条消息,2号2条,工作效率提高了不少

方式二(全局)

这里接受者不需要指定containerFactory

  1. spring:
  2. rabbitmq:
  3. port: 5672
  4. host: 127.0.0.1
  5. username: guest
  6. password: guest
  7. listener:
  8. simple:
  9. prefetch: 1

相关文章