springboot:整合rabbitmq之交换机

x33g5p2x  于2022-06-16 转载在 Spring  
字(13.2k)|赞(0)|评价(0)|浏览(464)

springboot:整合rabbitmq之交换机

环境准备

依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-amqp</artifactId>
  8. </dependency>

配置

  1. @Configuration
  2. public class RabbitMQConfiguration {
  3. @Bean
  4. public ConnectionFactory rabbitConnectionFactory() {
  5. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  6. String rabbitmqHost = "127.0.0.1";
  7. String rabbitmqPort = "5672";
  8. String rabbitmqUsername = "guest";
  9. String rabbitmqPassword = "guest";
  10. String rabbitmqVirtualHost = "/";
  11. connectionFactory.setHost(rabbitmqHost);
  12. connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
  13. connectionFactory.setUsername(rabbitmqUsername);
  14. connectionFactory.setPassword(rabbitmqPassword);
  15. connectionFactory.setVirtualHost(rabbitmqVirtualHost);
  16. // connectionFactory.setPublisherReturns(true);//开启return模式
  17. // connectionFactory.setPublisherConfirms(true);//开启confirm模式
  18. return connectionFactory;
  19. }
  20. @Bean(name = "rabbitTemplate")
  21. //必须是prototype类型
  22. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  23. public RabbitTemplate rabbitTemplate() {
  24. return new RabbitTemplate(rabbitConnectionFactory());
  25. }
  26. @Bean("customContainerFactory")
  27. public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
  28. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  29. //设置线程数
  30. factory.setConcurrentConsumers(1);
  31. //最大线程数
  32. factory.setMaxConcurrentConsumers(1);
  33. // //设置为手动确认MANUAL(手动),AUTO(自动);
  34. // factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  35. // 设置prefetch
  36. factory.setPrefetchCount(1);
  37. configurer.configure(factory, connectionFactory);
  38. return factory;
  39. }
  40. }

一、交换机

生产者发送消息给交换机,由交换机转发消息给队列,交换机可以转发给所有绑定它的队列,也可以转发给符合路由规则的队列,交换机本身不会存储消息,如果没有绑定任何队列,消息就会丢失

  • 发布订阅模型
    发布订阅使用的交换机是Fanout交换机,也叫广播式交换机
    广播式交换机:fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列
  • 路由模型
    路由式交换机:direct交换器相对来说比较简单,匹配规则为:路由键完全匹配,消息就被投送到相关的队列
  • 主题模型
    主题式交换机:topic交换器采用模糊匹配路由键的原则进行转发消息到队列中
  • 头部订阅
    头部订阅(headers 交换机):headers没有路由键,是根据消息头部header的键值对进行匹配,可以完全匹配也可以匹配任意一对键值对

二、交换机模式

发布订阅

生产者生产10条消息,发送给FanoutExchange,空字符串""表示不需要指定路由键

消费者A监听队列A,消费者B、B2监听队列B

  1. @RestController
  2. public class FanoutExchangeController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @GetMapping("/fanout")
  6. public void fanout() {
  7. for (int i = 1; i <= 10; i++) {
  8. String message = "Hello World ..." + i;
  9. rabbitTemplate.convertAndSend("fanout_exchange", "", message);
  10. System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
  11. }
  12. }
  13. private int count1 = 1;
  14. private int count2 = 1;
  15. private int count3 = 1;
  16. @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
  17. value = @Queue(value = "fanout_a", declare = "true"),
  18. exchange = @Exchange(value = "fanout_exchange", type = ExchangeTypes.FANOUT, durable = "true")
  19. )
  20. })
  21. public void receiveA(String message) throws InterruptedException {
  22. System.out.println(" [ 消费者@A号 ] Received ==> '" + message + "'");
  23. Thread.sleep(200);
  24. System.out.println(" [ 消费者@A号 ] Dealt with:" + count1++);
  25. }
  26. @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
  27. value = @Queue(value = "fanout_b", declare = "true"),
  28. exchange = @Exchange(value = "fanout_exchange", type = ExchangeTypes.FANOUT, durable = "true")
  29. )
  30. })
  31. public void receiveB(String message) throws InterruptedException {
  32. System.out.println(" [ 消费者@B号 ] Received ==> '" + message + "'");
  33. Thread.sleep(200);
  34. System.out.println(" [ 消费者@B号 ] Dealt with:" + count2++);
  35. }
  36. @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
  37. value = @Queue(value = "fanout_b", declare = "true"),
  38. exchange = @Exchange(value = "fanout_exchange", type = ExchangeTypes.FANOUT, durable = "true")
  39. )
  40. })
  41. public void receiveB2(String message) throws InterruptedException {
  42. System.out.println(" [ 消费者@B2号 ] Received ==> '" + message + "'");
  43. Thread.sleep(500);
  44. System.out.println(" [ 消费者@B2号 ] Dealt with:" + count3++);
  45. }
  46. }

运行结果

生产者生产了10条消息发送给交换机,fanout交换机将消息分别转发给绑定它的队列A、B;
队列A、B拿到内容一样的10条消息,队列A只有一个消费者A,所以由消费者A处理所有消息
队列B有两个消费者B、B2,分配模式是按能力分配,所以消费者B处理的多

路由模式

  1. @RestController
  2. public class DirectController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. private int count1 = 1;
  6. private int count2 = 1;
  7. private int count3 = 1;
  8. @GetMapping("/direct")
  9. public void direct() {
  10. for (int i = 1; i <= 30; i++) {
  11. String message = "Hello World ..." + i;
  12. System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
  13. if (i % 3 == 0) {
  14. rabbitTemplate.convertAndSend("direct_exchange", "direct_a", message);
  15. } else if (i % 3 == 1) {
  16. rabbitTemplate.convertAndSend("direct_exchange", "direct_b", message);
  17. } else {
  18. rabbitTemplate.convertAndSend("direct_exchange", "direct_b2", message);
  19. }
  20. }
  21. }
  22. /**
  23. * name:交换机名称
  24. * durable:设置是否持久,设置为true则将Exchange存盘,即使服务器重启数据也不会丢失
  25. * autoDelete:设置是否自动删除,当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange,简单来说也就是如果该Exchange没有和任何队列Queue绑定则删除
  26. * internal:设置是否是RabbitMQ内部使用,默认false。如果设置为 true ,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  27. * @param message
  28. * @throws InterruptedException
  29. */
  30. @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
  31. value = @Queue(value = "direct_a", declare = "true"),
  32. exchange = @Exchange(value = "direct_exchange", type = ExchangeTypes.DIRECT, durable = "true"),
  33. key = "direct_a"
  34. )
  35. })
  36. public void receiveA(String message) throws InterruptedException {
  37. System.out.println(" [ 消费者@A号 ] Received ==> '" + message + "'");
  38. Thread.sleep(200);
  39. System.out.println(" [ 消费者@A号 ] Dealt with:" + count1++);
  40. }
  41. @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
  42. value = @Queue(value = "direct_b", declare = "true"),
  43. exchange = @Exchange(value = "direct_exchange", type = ExchangeTypes.DIRECT, durable = "true"),
  44. key = "direct_b"
  45. )
  46. })
  47. public void receiveB(String message) throws InterruptedException {
  48. System.out.println(" [ 消费者@B号 ] Received ==> '" + message + "'");
  49. Thread.sleep(200);
  50. System.out.println(" [ 消费者@B号 ] Dealt with:" + count2++);
  51. }
  52. @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
  53. value = @Queue(value = "direct_b", declare = "true"),
  54. exchange = @Exchange(value = "direct_exchange", type = ExchangeTypes.DIRECT, durable = "true"),
  55. key = "direct_b2"
  56. )
  57. })
  58. public void receiveB2(String message) throws InterruptedException {
  59. System.out.println(" [ 消费者@B2号 ] Received ==> '" + message + "'");
  60. Thread.sleep(500);
  61. System.out.println(" [ 消费者@B2号 ] Dealt with:" + count3++);
  62. }
  63. }

运行结果

两个队列A、B,A有一个路由键,而B有两个路由键;
生产者生产30条消息发送给交换机,交换机根据路由键转发给队列,队列A分配10条消息,队列B分配20条消息;
同时队列D被两个消费者监听

主题模式(通配者模式)

  1. @RestController
  2. public class TopicController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. private int count1 = 1;
  6. private int count2 = 1;
  7. private int count3 = 1;
  8. @GetMapping("/topic")
  9. public void topic() {
  10. for (int i = 1; i <= 30; i++) {
  11. String message = "Hello World ..." + i;
  12. System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
  13. if (i % 3 == 0) {
  14. // topic.yzm.key,可以匹配 topic.yzm.* 和 topic.#
  15. rabbitTemplate.convertAndSend("topic_exchange", "topic.yzm.key", message);
  16. } else if (i % 3 == 1) {
  17. // topic.yzm.yzm,可以匹配 topic.yzm.* 、 topic.# 和 topic.*.yzm
  18. rabbitTemplate.convertAndSend("topic_exchange", "topic.yzm.yzm", message);
  19. } else {
  20. // topic.key.yzm,可以匹配 topic.# 和 topic.*.yzm
  21. rabbitTemplate.convertAndSend("topic_exchange", "topic.key.yzm", message);
  22. }
  23. }
  24. }
  25. @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
  26. value = @Queue(value = "topic_e", declare = "true"),
  27. exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC, durable = "true"),
  28. key = "topic.yzm.*"
  29. )
  30. })
  31. public void receiveE(String message) throws InterruptedException {
  32. System.out.println(" [ 消费者@E号 ] Received ==> '" + message + "'");
  33. Thread.sleep(200);
  34. System.out.println(" [ 消费者@E号 ] Dealt with:" + count1++);
  35. }
  36. @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
  37. value = @Queue(value = "topic_f", declare = "true"),
  38. exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC, durable = "true"),
  39. key = "topic.#"
  40. )
  41. })
  42. public void receiveF(String message) throws InterruptedException {
  43. System.out.println(" [ 消费者@F号 ] Received ==> '" + message + "'");
  44. Thread.sleep(200);
  45. System.out.println(" [ 消费者@F号 ] Dealt with:" + count2++);
  46. }
  47. @RabbitListener(containerFactory = "customContainerFactory", bindings = {@QueueBinding(
  48. value = @Queue(value = "topic_g", declare = "true"),
  49. exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC, durable = "true"),
  50. key = "topic.*.yzm"
  51. )
  52. })
  53. public void receiveG(String message) throws InterruptedException {
  54. System.out.println(" [ 消费者@G号 ] Received ==> '" + message + "'");
  55. Thread.sleep(200);
  56. System.out.println(" [ 消费者@G号 ] Dealt with:" + count3++);
  57. }
  58. }

分析结果:

E、G二十条,F三十条

头部模式

配置类

  1. @Configuration
  2. public class RabbitMQConfiguration {
  3. @Bean
  4. public ConnectionFactory rabbitConnectionFactory() {
  5. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  6. String rabbitmqHost = "127.0.0.1";
  7. String rabbitmqPort = "5672";
  8. String rabbitmqUsername = "guest";
  9. String rabbitmqPassword = "guest";
  10. String rabbitmqVirtualHost = "/";
  11. connectionFactory.setHost(rabbitmqHost);
  12. connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
  13. connectionFactory.setUsername(rabbitmqUsername);
  14. connectionFactory.setPassword(rabbitmqPassword);
  15. connectionFactory.setVirtualHost(rabbitmqVirtualHost);
  16. // connectionFactory.setPublisherReturns(true);//开启return模式
  17. // connectionFactory.setPublisherConfirms(true);//开启confirm模式
  18. return connectionFactory;
  19. }
  20. @Bean(name = "rabbitTemplate")
  21. //必须是prototype类型
  22. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  23. public RabbitTemplate rabbitTemplate() {
  24. return new RabbitTemplate(rabbitConnectionFactory());
  25. }
  26. @Bean("customContainerFactory")
  27. public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
  28. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  29. //设置线程数
  30. factory.setConcurrentConsumers(1);
  31. //最大线程数
  32. factory.setMaxConcurrentConsumers(1);
  33. // //设置为手动确认MANUAL(手动),AUTO(自动);
  34. // factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  35. // 设置prefetch
  36. factory.setPrefetchCount(1);
  37. configurer.configure(factory, connectionFactory);
  38. return factory;
  39. }
  40. public static final String QUEUE_X = "headers_x";
  41. public static final String QUEUE_Y = "headers_y";
  42. public static final String HEADER_EXCHANGE = "headers_exchange";
  43. @Bean
  44. public Queue queueX() {
  45. return new Queue(QUEUE_X);
  46. }
  47. @Bean
  48. public Queue queueY() {
  49. return new Queue(QUEUE_Y);
  50. }
  51. @Bean
  52. public HeadersExchange headersExchange() {
  53. return ExchangeBuilder.headersExchange(HEADER_EXCHANGE).build();
  54. }
  55. @Bean
  56. public Binding bindingX() {
  57. Map<String, Object> map = new HashMap<>();
  58. map.put("key1", "value1");
  59. map.put("name", "yzm");
  60. // whereAll:表示完全匹配
  61. return BindingBuilder.bind(queueX()).to(headersExchange()).whereAll(map).match();
  62. }
  63. @Bean
  64. public Binding bindingY() {
  65. Map<String, Object> map = new HashMap<>();
  66. map.put("key2", "value2");
  67. map.put("name", "yzm");
  68. // whereAny:表示只要有一对键值对能匹配就可以
  69. return BindingBuilder.bind(queueY()).to(headersExchange()).whereAny(map).match();
  70. }
  71. }

消息发送和接受

  1. @RestController
  2. public class HeadersController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @GetMapping("/headers")
  6. public void headers() {
  7. String s = "Hello World";
  8. System.out.println(" [ 生产者 ] Sent ==> '" + s + "'");
  9. MessageProperties messageProperties = new MessageProperties();
  10. messageProperties.setHeader("key1", "value1");
  11. messageProperties.setHeader("name", "yzm");
  12. Message message = new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties);
  13. rabbitTemplate.convertAndSend("headers_exchange", "", message);
  14. }
  15. @GetMapping("/headers2")
  16. public void headers2() {
  17. String s = "Hello World";
  18. System.out.println(" [ 生产者 ] Sent ==> '" + s + "'");
  19. MessageProperties messageProperties = new MessageProperties();
  20. messageProperties.setHeader("key3", "value3");
  21. messageProperties.setHeader("name", "yzm");
  22. Message message = new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties);
  23. rabbitTemplate.convertAndSend("headers_exchange", "", message);
  24. }
  25. @RabbitListener(queues = RabbitMQConfiguration.QUEUE_X)
  26. public void receiveX(String message) {
  27. System.out.println(" [ 消费者@X号 ] Received ==> '" + message + "'");
  28. }
  29. @RabbitListener(queues = RabbitMQConfiguration.QUEUE_Y)
  30. public void receiveY(String message) {
  31. System.out.println(" [ 消费者@Y号 ] Received ==> '" + message + "'");
  32. }
  33. }

测试结果

发送请求:http://localhost:8080/headers

生产消息时,头部键值对有:“key1”=“value1"和"name”=“yzm”,跟X队列能完全匹配上,跟Y队列能匹配上其中一个,所以两个消费者都能消费到消息

发送请求:http://localhost:8080/headers2

只有Y队列能匹配上一个键值对

相关文章