RabbitMQ07_SpringBoot与RabbitMQ的整合

x33g5p2x  于2021-12-19 转载在 其他  
字(6.8k)|赞(0)|评价(0)|浏览(853)

RabbitMQ07_SpringBoot与RabbitMQ的整合

  • 创建 Springboot 项目,选择 Spring WebSpring for RabbitMQ
  • Pom文件:
  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-web</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-test</artifactId>
  13. <scope>test</scope>
  14. <exclusions>
  15. <exclusion>
  16. <groupId>org.junit.vintage</groupId>
  17. <artifactId>junit-vintage-engine</artifactId>
  18. </exclusion>
  19. </exclusions>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.amqp</groupId>
  23. <artifactId>spring-rabbit-test</artifactId>
  24. <scope>test</scope>
  25. </dependency>
  26. </dependencies>
  • application.yml 配置文件:
  1. spring:
  2. rabbitmq:
  3. host: 118.31.106.51
  4. port: 5672
  5. username: ems
  6. password: 123456
  7. virtual-host: /ems
  • 发送消息的测试类(注入 RabbitTemplate):
  1. package com.blu.test;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import org.springframework.test.context.junit4.SpringRunner;
  8. import com.blu.RabbitmqSpringbootApplication;
  9. @SpringBootTest(classes = RabbitmqSpringbootApplication.class)
  10. @RunWith(SpringRunner.class)
  11. public class TestRabbitMQ {
  12. @Autowired
  13. private RabbitTemplate rabbitTemplate;
  14. @Test
  15. public void testHello() {
  16. rabbitTemplate.convertAndSend("hello","hello rabbitMQ!");
  17. }
  18. /** * work模型--公平消费 */
  19. @Test
  20. public void testWork() {
  21. for(int i=1;i<=20;i++) {
  22. rabbitTemplate.convertAndSend("work","work work!"+i);
  23. }
  24. }
  25. /** * fanout模型 参数1:交换机名称 参数2:routing key(无意义,填空字符串) */
  26. @Test
  27. public void testFanout() {
  28. rabbitTemplate.convertAndSend("logs","","fanout模型生产的消息");
  29. }
  30. /** * 订阅直连(Direct)模型,参数1:交换机名称,参数2:routing key */
  31. @Test
  32. public void testRoute() {
  33. rabbitTemplate.convertAndSend("directs","info","发送了info的key路由信息");
  34. }
  35. /** * 订阅(Topic)模型 */
  36. @Test
  37. public void testTopic() {
  38. rabbitTemplate.convertAndSend("topics","user.save","topic模型消息");
  39. }
  40. }
  • HelloCustomer(Hello模型的消息消费者):

注意:

1. 消费者类需要加上 @Component 注解
2. 在类上添加 @RabbitListener 注解,然后用 @RabbitHandler 指定回调方法
3. 或者直接在回调方法上添加 @RabbitListener 注解
4. 回调方法需要定义一个String 参数用于接收消息
5. 用 queuesToDeclare 参数绑定队列,用 @Queue 注解定义队列
6. @Queue 注解的 value 指定队列名,durable(持久化)默认为 “true”,autoDelete 默认为 “true”,是否独占默认为 "false"

  1. package com.blu.hello;
  2. import org.springframework.amqp.rabbit.annotation.Queue;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. /** * @Queue 注解默认定义的是持久化、非独占、非自动删除的队列 * @author BLU */
  7. @Component
  8. @RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "false",autoDelete = "true"))
  9. public class HelloCustomer {
  10. @RabbitHandler
  11. public void helloReceive(String message) {
  12. System.out.println("message = "+ message);
  13. }
  14. }
  • WorkConsumer(Work模型的消息消费者):
  1. package com.blu.work;
  2. import org.springframework.amqp.rabbit.annotation.Queue;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class WorkConsumer {
  7. @RabbitListener(queuesToDeclare = @Queue("work"))
  8. public void workReceive1(String message) {
  9. System.out.println("message1 = "+ message);
  10. }
  11. @RabbitListener(queuesToDeclare = @Queue("work"))
  12. public void workReceive2(String message) {
  13. System.out.println("message2 = "+ message);
  14. }
  15. }

消息接收详情:

  1. message2 = work work!2
  2. message1 = work work!1
  3. message1 = work work!3
  4. message2 = work work!4
  5. message1 = work work!5
  6. message2 = work work!6
  7. message1 = work work!7
  8. message2 = work work!8
  9. message1 = work work!9
  10. message2 = work work!10
  11. message2 = work work!12
  12. message1 = work work!11
  13. message2 = work work!14
  14. message1 = work work!13
  15. message2 = work work!16
  16. message1 = work work!15
  17. message2 = work work!18
  18. message1 = work work!17
  19. message1 = work work!19
  20. message2 = work work!20
  • FanoutConsumer(fanout (广播)模型的消息消费者):
  1. package com.blu.fanout;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class FanoutConsumer {
  9. @RabbitListener(bindings = { @QueueBinding(
  10. // 创建临时队列
  11. value = @Queue,
  12. // 绑定交换机,名称:logs,类型:fanout
  13. exchange = @Exchange(value = "logs", type = "fanout")) })
  14. public void fanoutReceive1(String message) {
  15. System.out.println("message1 = " + message);
  16. }
  17. @RabbitListener(bindings = { @QueueBinding(
  18. value = @Queue,
  19. exchange = @Exchange(value = "logs", type = "fanout")) })
  20. public void fanoutReceive2(String message) {
  21. System.out.println("message2 = " + message);
  22. }
  23. }

消息接收详情:

  1. message1 = fanout模型生产的消息
  2. message2 = fanout模型生产的消息
  • RouteConsumer(订阅直连(Direct)模型的消息消费者):
  1. package com.blu.route;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class RouteConsumer {
  9. @RabbitListener(bindings = {
  10. @QueueBinding(
  11. value = @Queue,
  12. exchange = @Exchange(value="directs",type = "direct"),
  13. key = {"info","error","debug","warn"})
  14. })
  15. public void RouteReceive1(String message) {
  16. System.out.println("message1 = "+message);
  17. }
  18. @RabbitListener(bindings = {
  19. @QueueBinding(
  20. value = @Queue,
  21. exchange = @Exchange(value="directs",type = "direct"),
  22. key = {"error"})
  23. })
  24. public void RouteReceive2(String message) {
  25. System.out.println("message2 = "+message);
  26. }
  27. }

消息接收详情:

  1. message1 = 发送了infokey路由信息
  • TopicConsumer(订阅(Topic)模型的消息消费者):
  1. package com.blu.topic;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class TopicConsumer {
  9. @RabbitListener(bindings = {
  10. @QueueBinding(value = @Queue,exchange = @Exchange(value = "topics",type = "topic"),key= {"user.*"})
  11. })
  12. public void TopicReceive1(String message) {
  13. System.out.println("message1 = "+message);
  14. }
  15. @RabbitListener(bindings = {
  16. @QueueBinding(value = @Queue,exchange = @Exchange(value = "topics",type = "topic"),key= {"order.#","product.*"})
  17. })
  18. public void TopicReceive2(String message) {
  19. System.out.println("message2 = "+message);
  20. }
  21. }

消息接收详情:

  1. message1 = topic模型消息
上一篇:Topic模型

相关文章