SpringBoot整合Rabbitmq

x33g5p2x  于2022-02-11 转载在 Spring  
字(7.3k)|赞(0)|评价(0)|浏览(512)

1.安装rabbitmq

可参考这篇文章链接: https://www.jianshu.com/p/14ffe0f3db94.

2.SpringBoot整合Rabbitmq案例

2.1 Direct模式(点对点)

1.docker上启动rabbitmq实例
2.pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.6.3</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.yl</groupId>
  12. <artifactId>amqp</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>amqp</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>11</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-amqp</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-web</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-starter-test</artifactId>
  31. <scope>test</scope>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.springframework.amqp</groupId>
  35. <artifactId>spring-rabbit-test</artifactId>
  36. <scope>test</scope>
  37. </dependency>
  38. </dependencies>
  39. <build>
  40. <plugins>
  41. <plugin>
  42. <groupId>org.springframework.boot</groupId>
  43. <artifactId>spring-boot-maven-plugin</artifactId>
  44. </plugin>
  45. </plugins>
  46. </build>
  47. </project>

3.application.properties

  1. # rabbitmq的配置
  2. # 主机
  3. spring.rabbitmq.host=192.168.244.135
  4. # 端口
  5. spring.rabbitmq.port=5672
  6. # 用户名
  7. spring.rabbitmq.username=guest
  8. # 密码
  9. spring.rabbitmq.password=guest

4.Direct模式配置文件

  1. package com.yl.amqp.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class DirectConfig {
  10. //定义队列
  11. @Bean
  12. Queue directQueue() {
  13. return new Queue("yl-queue",true);
  14. }
  15. //定义Direct交换机
  16. @Bean
  17. DirectExchange directExchange() {
  18. return new DirectExchange("yl-direct",true,false);
  19. }
  20. //将队列绑定到交换机上
  21. @Bean
  22. Binding directBiding() {
  23. return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
  24. }
  25. }

5.监听消息队列

  1. package com.yl.amqp.receicer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class DirectReceiver {
  6. // 监听接收消息
  7. @RabbitListener(queues = "yl-queue")
  8. public void handler(String msg) {
  9. System.out.println(msg);
  10. }
  11. }

6.测试

2.2 Fanout模式(点对面)

1.Fanout模式配置

  1. package com.yl.amqp.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class FanoutConfig {
  10. @Bean
  11. Queue queueOne() {
  12. return new Queue("queue-one",true);
  13. }
  14. @Bean
  15. Queue queueTwo() {
  16. return new Queue("queue-two",true);
  17. }
  18. // 定义fanout交换机
  19. @Bean
  20. FanoutExchange fanoutExchange() {
  21. return new FanoutExchange("yl-fanout",true,false);
  22. }
  23. // 将队列绑定到交换机
  24. @Bean
  25. Binding bindingOne() {
  26. return BindingBuilder.bind(queueOne()).to(fanoutExchange());
  27. }
  28. // 将队列绑定到交换机
  29. @Bean
  30. Binding bindingTwo() {
  31. return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
  32. }
  33. }

2.监听消息

  1. package com.yl.amqp.receicer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class FanoutReceiver {
  6. @RabbitListener(queues = "queue-one")
  7. public void handler1(String msg) {
  8. System.out.println("handle1()"+msg);
  9. }
  10. @RabbitListener(queues = "queue-two")
  11. public void handle2(String msg) {
  12. System.out.println("handle2()"+msg);
  13. }
  14. }

3.测试

2.3 Tpoic模式(点对面)

1.Topic模式配置

  1. package com.yl.amqp.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class TopicConfig {
  10. @Bean
  11. Queue huawei() {
  12. return new Queue("huawei",true);
  13. }
  14. @Bean
  15. Queue apple() {
  16. return new Queue("apple",true);
  17. }
  18. @Bean
  19. Queue phone() {
  20. return new Queue("phone",true);
  21. }
  22. @Bean
  23. TopicExchange topicExchange() {
  24. return new TopicExchange("yl-topic",true,false);
  25. }
  26. @Bean
  27. Binding huaweiBinding() {
  28. return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
  29. }
  30. @Bean
  31. Binding appleBinding() {
  32. return BindingBuilder.bind(apple()).to(topicExchange()).with("apple.#");
  33. }
  34. @Bean
  35. Binding phoneBinding() {
  36. return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#");
  37. }
  38. }

2.监听消息

  1. package com.yl.amqp.receicer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class TopicReceiver {
  6. @RabbitListener(queues = "huawei")
  7. public void handle1(String msg) {
  8. System.out.println("handle1()" + msg);
  9. }
  10. @RabbitListener(queues = "apple")
  11. public void handle2(String msg) {
  12. System.out.println("handle2()" + msg);
  13. }
  14. @RabbitListener(queues = "phone")
  15. public void handle3(String msg) {
  16. System.out.println("handle3()" + msg);
  17. }
  18. }

3.测试1

4.测试2

2.4 Headers模式(很少用到)

1.Headers模式的配置

  1. package com.yl.amqp.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.HeadersExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. @Configuration
  11. public class HeaderConfig {
  12. @Bean
  13. Queue ageQueue() {
  14. return new Queue("queue-age",true);
  15. }
  16. @Bean
  17. Queue nameQueue() {
  18. return new Queue("queue-name",true);
  19. }
  20. @Bean
  21. HeadersExchange headersExchange() {
  22. return new HeadersExchange("yl-headers",true,false);
  23. }
  24. @Bean
  25. Binding ageBinding() {
  26. Map<String,Object> map = new HashMap<>();
  27. map.put("age",18);
  28. return BindingBuilder.bind(ageQueue()).to(headersExchange()).whereAny(map).match();
  29. }
  30. @Bean
  31. Binding nameBinding() {
  32. return BindingBuilder.bind(nameQueue()).to(headersExchange()).where("name").exists();
  33. }
  34. }

2.监听消息

  1. package com.yl.amqp.receicer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class HeadersReceiver {
  6. @RabbitListener(queues = "queue-age")
  7. public void handle1(String msg) {
  8. System.out.println("handle1(),queue-age:" + msg);
  9. }
  10. @RabbitListener(queues = "queue-name")
  11. public void handle2(String msg) {
  12. System.out.println("handle2(),queue-name:" + msg);
  13. }
  14. }

3.测试1

4.测试2

相关文章