springboot(五)rabbitmq的使用

x33g5p2x  于2021-12-17 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(525)

rabbitmq 介绍

       rabbitmq是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 rabbitmq主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

rabbitmq是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

交换机的四种类型:

      交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout

1.Direct Exchange

      Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

2.Topic Exchange

*       *Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

在这种交换机模式下:

  • 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
  • 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是 agreements.eu.berlin.#,那么,以agreements.eu.berlin 开头的路由键都是可以的。

topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:

  • *表示一个词.
  • #表示零个或多个词.

3.Fanout Exchange

     Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。

4.Headers Exchange

     Headers Exchange 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

安装rabbitmq

     使用docker镜像安装rabbitmq,简单快捷。教程地址:

     https://blog.csdn.net/xu12387/article/details/85049486

springboot集成rabbitmq

     新建项目:springboot-rabbitmq ,打开pom.xml文件加入相关依赖

  1. <dependencies>
  2. <!--测试-->
  3. <dependency>
  4. <groupId>org.springframework.boot</groupId>
  5. <artifactId>spring-boot-starter-test</artifactId>
  6. <scope>test</scope>
  7. </dependency>
  8. <!--rabbitmq-->
  9. <dependency>
  10. <groupId>org.springframework.boot</groupId>
  11. <artifactId>spring-boot-starter-amqp</artifactId>
  12. </dependency>
  13. </dependencies>

      配置application.properties,内容如下:

  1. #rabbitmq主机
  2. spring.rabbitmq.host=10.24.247.23
  3. #端口
  4. spring.rabbitmq.port=5672
  5. #用户名
  6. spring.rabbitmq.username=admin
  7. #密码
  8. spring.rabbitmq.password=123456

1.交换机的四种模式实现:Direct Exchange模式(默认模式):

         配置文件RabbitMQConfig .java,内容如下:

  1. @Configuration
  2. public class RabbitMQConfig {
  3. public static final String QUEUE = "queue";
  4. /**
  5. * Directm模式
  6. */
  7. @Bean
  8. public Queue queue() {
  9. //true代表是否持久化
  10. return new Queue(QUEUE, true);
  11. }
  12. }

       发送者Sender.java,内容如下:

  1. @Component
  2. public class Sender {
  3. private static Logger log = LoggerFactory.getLogger(Sender.class);
  4. @Autowired
  5. private AmqpTemplate amqpTemplate;
  6. public void send(String msg){
  7. log.info("send message:"+msg);
  8. amqpTemplate.convertAndSend(RabbitMQConfig.QUEUE,msg);
  9. }
  10. }

        接收者Recevier.java,内容如下:

  1. @Component
  2. public class Receiver {
  3. private static Logger log = LoggerFactory.getLogger(Receiver.class);
  4. @RabbitListener(queues = RabbitMQConfig.QUEUE)
  5. public void receiver(String message) {
  6. log.info("recevier message:"+message);
  7. }
  8. }

        编写测试方法,内容如下:

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class SpringbootRabbitmqApplicationTests {
  4. @Autowired
  5. private Sender sender;
  6. @Test
  7. public void send() {
  8. sender.send("你好,我是老猫");
  9. }
  10. }

       可以清楚的看到,发送者只要发送消息,接收者就能立马收到。特别注意,他们的queue(队列)都是同一个!

2.交换机的四种模式实现:Direct Exchange模式:

        配置文件RabbitMQConfig .java,内容如下:

  1. @Configuration
  2. public class RabbitMQConfig {
  3. public static final String TOPIC_QUEUE1 = "topic.queue1";
  4. public static final String TOPIC_QUEUE2 = "topic.queue2";
  5. public static final String TOPIC_EXCHANGE = "topicExchange";
  6. /**
  7. * Topic模式
  8. */
  9. @Bean
  10. public Queue topicQueue1() {
  11. return new Queue(TOPIC_QUEUE1, true);
  12. }
  13. @Bean
  14. public Queue topicQueue2() {
  15. return new Queue(TOPIC_QUEUE2, true);
  16. }
  17. @Bean
  18. public TopicExchange topicExchange() {
  19. return new TopicExchange(TOPIC_EXCHANGE);
  20. }
  21. @Bean
  22. public Binding topicBinding1(){
  23. return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1");
  24. }
  25. @Bean
  26. public Binding topicBinding2(){
  27. return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
  28. }
  29. }

       发送者Sender.java,内容如下:

  1. @Component
  2. public class Sender {
  3. private static Logger log = LoggerFactory.getLogger(Sender.class);
  4. @Autowired
  5. private AmqpTemplate amqpTemplate;
  6. public void sendTopic(String msg) {
  7. log.info("send topic message:" + msg);
  8. amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, "topic.key1", msg + 1);
  9. amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, "topic.key2", msg + 2);
  10. }
  11. }

        接收者Recevier.java,内容如下:

  1. @Component
  2. public class Receiver {
  3. private static Logger log = LoggerFactory.getLogger(Receiver.class);
  4. @RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE1)
  5. public void receiverTopic1(String message) {
  6. log.info("receiver topic queue1 message:"+message);
  7. }
  8. @RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE2)
  9. public void receiverTopic2(String message) {
  10. log.info("receiver topic queue2 message:"+message);
  11. }
  12. }

        编写测试方法,内容如下:

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class SpringbootRabbitmqApplicationTests {
  4. @Autowired
  5. private Sender sender;
  6. @Test
  7. public void sendTopic() {
  8. sender.sendTopic("你好,我是老猫");
  9. }
  10. }

其中topic.#中的#代表匹配任意单词,所以我们的queue2就就收到了两次,queue1因为binding的是固定名称所以只接收了一次

3.交换机的四种模式实现:Fanout Exchange模式:

        配置文件RabbitMQConfig .java,内容如下:

  1. @Configuration
  2. public class RabbitMQConfig {
  3. public static final String FANOUT_QUEUE1 = "fanout.queue1";
  4. public static final String FANOUT_QUEUE2 = "fanout.queue2";
  5. public static final String FANOUT_EXCHANGE = "fanoutExchange";
  6. /**
  7. * Fanout模式
  8. */
  9. @Bean
  10. public Queue fanoutQueue1() {
  11. return new Queue(FANOUT_QUEUE1, true);
  12. }
  13. @Bean
  14. public Queue fanoutQueue2() {
  15. return new Queue(FANOUT_QUEUE2, true);
  16. }
  17. @Bean
  18. public FanoutExchange fanoutExchange() {
  19. return new FanoutExchange(FANOUT_EXCHANGE);
  20. }
  21. @Bean
  22. public Binding fanoutBinding1() {
  23. return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
  24. }
  25. @Bean
  26. public Binding fanoutBinding2() {
  27. return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
  28. }
  29. }

       发送者Sender.java,内容如下:

  1. @Component
  2. public class Sender {
  3. private static Logger log = LoggerFactory.getLogger(Sender.class);
  4. @Autowired
  5. private AmqpTemplate amqpTemplate;
  6. public void sendFanout(String msg){
  7. log.info("send fanout message:"+msg);
  8. amqpTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE,"",msg);
  9. }
  10. }

        接收者Recevier.java,内容如下:

  1. @Component
  2. public class Receiver {
  3. private static Logger log = LoggerFactory.getLogger(Receiver.class);
  4. @RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE1)
  5. public void receiverFanout1(String message) {
  6. log.info("receiver fanout queue1 message:"+message);
  7. }
  8. @RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE2)
  9. public void receiverFanout2(String message) {
  10. log.info("receiver fanout queue2 message:"+message);
  11. }
  12. }

        编写测试方法,内容如下:

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class SpringbootRabbitmqApplicationTests {
  4. @Autowired
  5. private Sender sender;
  6. @Test
  7. public void sendFanout() {
  8. sender.sendFanout("你好,我是老猫");
  9. }
  10. }

4.交换机的四种模式实现:Fanout Exchange模式:

        配置文件RabbitMQConfig .java,内容如下:

  1. @Configuration
  2. public class RabbitMQConfig {
  3. public static final String HEADERS_QUEUE = "headers.queue";
  4. public static final String HEADERS_EXCHANGE = "headersExchange";
  5. /**
  6. * Headers模式
  7. */
  8. @Bean
  9. public HeadersExchange headersExchange() {
  10. return new HeadersExchange(HEADER_EXCHANGE);
  11. }
  12. @Bean
  13. public Queue headersQueue() {
  14. return new Queue(HEADERS_QUEUE, true);
  15. }
  16. @Bean
  17. public Binding headersBinding(){
  18. Map<String,Object> map=new HashMap<String,Object>();
  19. map.put("headers1","value1");
  20. map.put("headers2","value2");
  21. return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match();
  22. }
  23. }

       发送者Sender.java,内容如下:

  1. @Component
  2. public class Sender {
  3. private static Logger log = LoggerFactory.getLogger(Sender.class);
  4. @Autowired
  5. private AmqpTemplate amqpTemplate;
  6. public void sendHeaders(String msg){
  7. log.info("send headers message:"+msg);
  8. MessageProperties properties=new MessageProperties();
  9. properties.setHeader("headers1","value1");
  10. properties.setHeader("headers2","value2");
  11. Message message=new Message(msg.getBytes(),properties);
  12. amqpTemplate.convertAndSend(RabbitMQConfig.HEADERS_EXCHANGE,"", message);
  13. }
  14. }

        接收者Recevier.java,内容如下:

  1. @Component
  2. public class Receiver {
  3. private static Logger log = LoggerFactory.getLogger(Receiver.class);
  4. @RabbitListener(queues = MQConfig.HEADERS_QUEUE)
  5. public void receiverHeaderS(byte[] message) {
  6. log.info("receiver headers message:"+new String(message));
  7. }
  8. }

        编写测试方法,内容如下:

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class SpringbootRabbitmqApplicationTests {
  4. @Autowired
  5. private Sender sender;
  6. @Test
  7. public void sendHeaders(){
  8. sender.sendHeaders("你好,我是老猫");
  9. }
  10. }

源码地址https://gitee.com/xu0123/springboot2

相关文章