SpringBoot-RabbitMQ入门

x33g5p2x  于2021-09-22 转载在 Spring  
字(21.6k)|赞(0)|评价(0)|浏览(593)

SpringBoot-RabbitMQ入门

什么是RabbitMQ

Q全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

为什么使用MQ

在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高系统吞吐量
*
开发中消息队列通常有如下应用场景:

1、任务异步处理

将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。

2、应用程序解耦合

MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

3、削峰填谷

如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。

解决办法:

消息被MQ保存起来,然后系统就可以按照自己的消费能力来消费,比如每秒1000个数据,这样慢慢写入数据库,这样就不会卡死数据库了。

但是使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”

RabbitMQ各组件功能

//Broker:/*/*标识消息队列服务器实体.

//Virtual Host:/*/*虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。

//Exchange:/*/*交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

//Queue:/*/*消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

//Banding:/*/*绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

//Channel:/*/*信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

//Connection:/*/*网络连接,比如一个TCP连接。

//Publisher:/*/*消息的生产者,也是一个向交换器发布消息的客户端应用程序。

//Consumer:/*/*消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。

//Message:/*/*消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。

创建项目

可以在windows中安装RabbiMQ 也可以在Linux 中安装RabbiMQ 可以到百度上查怎么安装的 我写的文章中也有具体怎么安装的

安装后 在IDEA中创建一个 Maven项目

添加依赖 Maven

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.6.0</version>
  5. </dependency>

在编写代码前 要开启RabbitMQ的服务 否则是无法使用的

什么是生产者什么是消费者

就好比微信发消息 发消息的一方 是生产者 生产消息

而消费者 就是 获取消息的一方

学过Socket 通信的 就很好掌握

在Socket 中 客户端 就好比是 生产者 而 服务端就好比是消费者 但是不一样的是

服务端必须保证在线 否则客户端是无法发送消息的 也就是连接失败

而在RabbitMQ

利用 队列来存储 客户端(生产者) 发送的消息 (就好比数据库)

而服务端(消费者)开启后会自动获取 队列中 的全部消息

而队列就好比是中间商 来帮助 客户端和服务端 交互 不用 服务端一直开启

我们在来说说 微信 :

我们 在微信中 发生一条信息给 其他用户 但是那个用户不在线 信息发过去了没???

答案是没有, 那么存储在哪里了? , 存储在 你和他的队列里了, 当他上线后 自动会从队列中获取 你发送的全部消息

面我们就来实现 上面讲述的功能

编写RabbitMQ 连接

ConnectionUtil

  1. package com.itheima.rabbitmq.simple;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. public class ConnectionUtil {
  5. //队列名称
  6. public static final String QUEUE_NAME = "simple_queue";
  7. public static Connection getConnection() throws Exception {
  8. //创建连接工厂
  9. ConnectionFactory connectionFactory = new ConnectionFactory();
  10. //主机地址 如果是本机就是localhost 如果在其他 地方比如:虚拟机中 那么就是ip地址
  11. connectionFactory.setHost("192.168.216.128");
  12. //连接端口;默认为 5672
  13. connectionFactory.setPort(5672);
  14. //虚拟主机名称 就是和你用户绑定的虚拟机 在创建用户时候就指定了
  15. connectionFactory.setVirtualHost("/itcast");
  16. //连接用户名
  17. connectionFactory.setUsername("admin");
  18. //连接密码
  19. connectionFactory.setPassword("admin");
  20. //创建连接
  21. return connectionFactory.newConnection();
  22. }
  23. }

编写生产者

Producer

  1. package com.itheima.rabbitmq.simple;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. //生产者
  5. public class Producer {
  6. public static void main(String[] args) throws Exception {
  7. //创建连接
  8. Connection connection = ConnectionUtil.getConnection();
  9. // 创建频道
  10. Channel channel = connection.createChannel();
  11. // 声明(创建)队列
  12. /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */
  13. channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null);
  14. // 要发送的信息
  15. String message = "你好;小兔子111!";
  16. /** * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */
  17. channel.basicPublish("", ConnectionUtil.QUEUE_NAME, null, message.getBytes());
  18. System.out.println("已发送消息:" + message);
  19. // 关闭资源
  20. channel.close();
  21. connection.close();
  22. }
  23. }

在执行上述的消息发送之后;可以登录RabbitMQ的管理控制台,可以发现队列和其消息:

我是在linux上搭建的 启动服务器后 在游览器上输入 http://192.168.216.128:15672 就能进入RabbitMQ

ip是linux 的ip地址 windows也一样

查看发送的消息信息

编写消费者

  1. package com.itheima.rabbitmq.simple;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. //消费者
  5. public class Consumer {
  6. public static void main(String[] args) throws Exception {
  7. Connection connection = ConnectionUtil.getConnection();
  8. // 创建频道
  9. Channel channel = connection.createChannel();
  10. // 声明(创建)队列
  11. /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */
  12. channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null);
  13. //创建消费者;并设置消息处理
  14. DefaultConsumer consumer = new DefaultConsumer(channel){
  15. @Override
  16. /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */
  17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  18. //路由key
  19. System.out.println("路由key为:" + envelope.getRoutingKey());
  20. //交换机
  21. System.out.println("交换机为:" + envelope.getExchange());
  22. //消息id
  23. System.out.println("消息id为:" + envelope.getDeliveryTag());
  24. //收到的消息
  25. System.out.println("接收到的消息为:" + new String(body, "utf-8"));
  26. }
  27. };
  28. //监听消息
  29. /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */
  30. channel.basicConsume(ConnectionUtil.QUEUE_NAME, true, consumer);
  31. //不关闭资源,应该一直监听消息
  32. //channel.close();
  33. //connection.close();
  34. }
  35. }

当启动消费者后 自动获取 生产者 发送到队列里的消息 而且持续保持监听

只要消费者接受到消息 那么 就会将队列里对应的消息删除

你也可以手动收到消息

  1. //手动确认消息
  2. channel.basicAck(envelope.getDeliveryTag(), true);

那么就要将 是否自动确认 关闭

  1. channel.basicConsume(ConnectionUtil.QUEUE_NAME, false, consumer);

具体怎么使用 看下面的案例

Work Queues(工作队列)

在上面的入门案例中我们 完成了 一个消费者对应一个生产者

这个案例中我们完成一个生产者对应多个消费者 来加快消息的处理

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

比如 抢购活动 下了1000个订单 我可以使用多个消费者来 分别处理这1000个订单

Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。

只是确认消息 需要手动 确定

结构图:

RabbitMQ连接上面有这里就不写了

生产者

Producer

  1. package com.itheima.rabbitmq.simple;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. public class Producer {
  5. public static void main(String[] args) throws Exception {
  6. //创建连接
  7. Connection connection = ConnectionUtil.getConnection();
  8. // 创建频道
  9. Channel channel = connection.createChannel();
  10. // 声明(创建)队列
  11. /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */
  12. channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null);
  13. for (int i = 0; i < 30; i++) {
  14. // 要发送的信息
  15. String message = "你好;小兔子!---- "+i;
  16. /** * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */
  17. channel.basicPublish("", ConnectionUtil.QUEUE_NAME, null, message.getBytes());
  18. System.out.println("已发送消息:" + message);
  19. }
  20. // 关闭资源
  21. channel.close();
  22. connection.close();
  23. }
  24. }

一次生产 30条 "你好;小兔子!---- "+i;

可以到RabbitMQ的管理控制台 查看

消费者 1

Consumer1

  1. package com.itheima.rabbitmq.simple.Work;
  2. import com.itheima.rabbitmq.simple.ConnectionUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. //消费者 1
  6. public class Consumer1 {
  7. public static void main(String[] args) throws Exception {
  8. Connection connection = ConnectionUtil.getConnection();
  9. // 创建频道
  10. Channel channel = connection.createChannel();
  11. // 声明(创建)队列
  12. /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */
  13. channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null);
  14. //一次只能接收并处理一个消息
  15. channel.basicQos(1);
  16. //创建消费者;并设置消息处理
  17. DefaultConsumer consumer = new DefaultConsumer(channel){
  18. @Override
  19. /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. //路由key
  22. System.out.println("路由key为:" + envelope.getRoutingKey());
  23. //交换机
  24. System.out.println("交换机为:" + envelope.getExchange());
  25. //消息id
  26. System.out.println("消息id为:" + envelope.getDeliveryTag());
  27. //收到的消息
  28. System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
  29. // Thread.sleep(1000);
  30. //手动确认消息
  31. channel.basicAck(envelope.getDeliveryTag(), true);
  32. }
  33. };
  34. //监听消息
  35. /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */
  36. channel.basicConsume(ConnectionUtil.QUEUE_NAME, false, consumer);
  37. }
  38. }

消费者2

Consumer2

代码和Consumer1 一模一样 复制一下 改下文件名

启动消费者 1 和 消费者2 然后 运行 生产者

部分截图:

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

简单来说就是谁抢到就是谁的

Publish/Subscribe订阅模式

在之前的 模式中都是 一次只能将消息发送给一个队里 那么我们可以将消息发送给所有指定的队列里吗??

当前可以 我们可以使用交换机 来代替我,们发送 就和中间商一样 我们把消息给中间商,而中间商 帮助我们来发送消息给各个用户 这就是订阅模式

订阅模式是: 将某一个消费者 的消息发给多个队里 而队列里的所有消费者 都能共享到此消息

就拿微信群来说:

我在群中发送一条消息 只要是 群里的用户 都能接收到我的消息 这原理就是

发送者将消息交给 交换机 而交换机 在发送给 此群所有人的队列里 因为此群所有人的队列 都和此交换机绑定了

如果还不懂 那么 微信公众号 知道吧 如果你不关注他的公众号那么他发送的信息你是不会收到的

如果你关注了他的公众号那么 就相当于和此公众号的交换机绑定了 那么公众号 发生一条消息 给交换机,

此交换机就会将 消息发生给所有绑定此交换机的微信用户的队列里 当微信在线时候就会自动接收到消息

案例:

ConnectionUtil (连接)

  1. package com.itheima.rabbitmq.simple.ps;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. public class ConnectionUtil {
  5. //交换机名称
  6. static final String FANOUT_EXCHAGE = "fanout_exchange";
  7. //队列名称
  8. static final String FANOUT_QUEUE_1 = "fanout_queue_1";
  9. //队列名称
  10. static final String FANOUT_QUEUE_2 = "fanout_queue_2";
  11. public static Connection getConnection() throws Exception {
  12. //创建连接工厂
  13. ConnectionFactory connectionFactory = new ConnectionFactory();
  14. //主机地址 如果是本机就是localhost 如果在其他 地方比如:虚拟机中 那么就是ip地址
  15. connectionFactory.setHost("localhost");
  16. //连接端口;默认为 5672
  17. connectionFactory.setPort(5672);
  18. //虚拟主机名称 就是和你用户绑定的虚拟机
  19. connectionFactory.setVirtualHost("/");
  20. //连接用户名
  21. connectionFactory.setUsername("guest");
  22. //连接密码
  23. connectionFactory.setPassword("guest");
  24. //创建连接
  25. return connectionFactory.newConnection();
  26. }
  27. }

生产者

Producer

  1. package com.itheima.rabbitmq.simple.ps;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. /** * 发布与订阅使用的交换机类型为:fanout */
  6. public class Producer { //生产者
  7. public static void main(String[] args) throws Exception {
  8. //创建连接
  9. Connection connection = ConnectionUtil.getConnection();
  10. // 创建频道
  11. Channel channel = connection.createChannel();
  12. /** * 创建交换机 * 参数1:交换机名称 * 参数2:交换机类型,fanout、topic、direct、headers */
  13. channel.exchangeDeclare(ConnectionUtil.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
  14. // 声明(创建)队列
  15. /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */
  16. channel.queueDeclare(ConnectionUtil.FANOUT_QUEUE_1, true, false, false, null);
  17. channel.queueDeclare(ConnectionUtil.FANOUT_QUEUE_2, true, false, false, null);
  18. //队列绑定交换机 也就是每次消息要推送的队列
  19. channel.queueBind(ConnectionUtil.FANOUT_QUEUE_1, ConnectionUtil.FANOUT_EXCHAGE, "");
  20. channel.queueBind(ConnectionUtil.FANOUT_QUEUE_2, ConnectionUtil.FANOUT_EXCHAGE, "");
  21. for (int i = 1; i <= 10; i++) {
  22. // 发送信息
  23. String message = "你好;小兔子!发布订阅模式--" + i;
  24. /** * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */
  25. channel.basicPublish(ConnectionUtil.FANOUT_EXCHAGE, "", null, message.getBytes());
  26. System.out.println("已发送消息:" + message);
  27. }
  28. // 关闭资源
  29. channel.close();
  30. connection.close();
  31. }
  32. }

消费者1

  1. package com.itheima.rabbitmq.simple.ps;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. public class Consumer1 { //消费者1
  5. public static void main(String[] args) throws Exception {
  6. Connection connection = ConnectionUtil.getConnection();
  7. // 创建频道
  8. Channel channel = connection.createChannel();
  9. //创建交换机
  10. channel.exchangeDeclare(ConnectionUtil.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
  11. // 声明(创建)队列
  12. /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */
  13. channel.queueDeclare(ConnectionUtil.FANOUT_QUEUE_1, true, false, false, null);
  14. //队列绑定交换机
  15. channel.queueBind(ConnectionUtil.FANOUT_QUEUE_1, ConnectionUtil.FANOUT_EXCHAGE, "");
  16. //创建消费者;并设置消息处理
  17. DefaultConsumer consumer = new DefaultConsumer(channel){
  18. @Override
  19. /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. //路由key
  22. System.out.println("路由key为:" + envelope.getRoutingKey());
  23. //交换机
  24. System.out.println("交换机为:" + envelope.getExchange());
  25. //消息id
  26. System.out.println("消息id为:" + envelope.getDeliveryTag());
  27. //收到的消息
  28. System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
  29. }
  30. };
  31. //监听消息
  32. /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */
  33. channel.basicConsume(ConnectionUtil.FANOUT_QUEUE_1, true, consumer);
  34. }
  35. }

消费者2

  1. package com.itheima.rabbitmq.simple.ps;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. public class Consumer2 { //消费者2
  5. public static void main(String[] args) throws Exception {
  6. Connection connection = ConnectionUtil.getConnection();
  7. // 创建频道
  8. Channel channel = connection.createChannel();
  9. //创建交换机
  10. channel.exchangeDeclare(ConnectionUtil.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
  11. // 声明(创建)队列
  12. /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */
  13. channel.queueDeclare(ConnectionUtil.FANOUT_QUEUE_2, true, false, false, null);
  14. //队列绑定交换机
  15. channel.queueBind(ConnectionUtil.FANOUT_QUEUE_2, ConnectionUtil.FANOUT_EXCHAGE, "");
  16. //创建消费者;并设置消息处理
  17. DefaultConsumer consumer = new DefaultConsumer(channel){
  18. @Override
  19. /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. //路由key
  22. System.out.println("路由key为:" + envelope.getRoutingKey());
  23. //交换机
  24. System.out.println("交换机为:" + envelope.getExchange());
  25. //消息id
  26. System.out.println("消息id为:" + envelope.getDeliveryTag());
  27. //收到的消息
  28. System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
  29. }
  30. };
  31. //监听消息
  32. /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */
  33. channel.basicConsume(ConnectionUtil.FANOUT_QUEUE_2, true, consumer);
  34. }
  35. }

先运行 所有消费者 然后在运行 生产者

控制台结果:

可以看出来 交换机成功的把 消息 发送到两个队列里了

然后我们在来看看 RabbiMQ 控制台里

这个是我们创建的交换机

点击fanout_exchange 进入里面

这就是我们绑定的队列 看看就行了别乱点

我们在看看队列

这就是我们创建的队列 我们点击fanout_queue_1进入

这个就是此队列绑定的交换机

RoutingKEY路由模式

什么是路由模式:

就是在订阅模式的基础上 加个RoutingKEY标记这个 标记的作用是 来区分消息的发送 就和送快递一样 根据地址将快递送给对应的人 也可以说是分流

我们还拿微信来说:

就比如一个企业群:

普通消息是所有人都能接收到 而有些消息只能管理层才能接收到

比如:通知经理以上员工来开会

这就需要管理层关联管理队列和普通队列 普通员工关联普通队列

在比如商品发布:

商品分为会员商品和普通商品

会员能接受到普通商品的发布和会员商品的发布信息

而普通用户只能接受到普通商品的发布信息

商品发布案例:

ConnectionUtil(连接)

  1. package com.itheima.rabbitmq.simple.direct;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. public class ConnectionUtil {
  5. //交换机名称
  6. static final String DIRECT_EXCHAGE = "direct_exchange";
  7. //会员队列名称
  8. static final String DIRECT_QUEUE_MEMBER = "direct_queue_member";
  9. //会员路由
  10. static final String DIRECT_ROUTING_MEMBER = "member";
  11. //普通队列名称
  12. static final String DIRECT_QUEUE_COMMON = "direct_queue_common";
  13. //普通路由
  14. static final String DIRECT_ROUTING_COMMON = "common";
  15. public static Connection getConnection() throws Exception {
  16. //创建连接工厂
  17. ConnectionFactory connectionFactory = new ConnectionFactory();
  18. //主机地址 如果是本机就是localhost 如果在其他 地方比如:虚拟机中 那么就是ip地址
  19. connectionFactory.setHost("localhost");
  20. //连接端口;默认为 5672
  21. connectionFactory.setPort(5672);
  22. //虚拟主机名称 就是和你用户绑定的虚拟机
  23. connectionFactory.setVirtualHost("/");
  24. //连接用户名
  25. connectionFactory.setUsername("guest");
  26. //连接密码
  27. connectionFactory.setPassword("guest");
  28. //创建连接
  29. return connectionFactory.newConnection();
  30. }
  31. }

生产者

Producer

  1. package com.itheima.rabbitmq.simple.direct;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. public class Producer { //生产者
  6. public static void main(String[] args) throws Exception {
  7. //创建连接
  8. Connection connection = ConnectionUtil.getConnection();
  9. // 创建频道
  10. Channel channel = connection.createChannel();
  11. /** * 创建交换机 * 参数1:交换机名称 * 参数2:交换机类型,fanout、topic、direct、headers */
  12. channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
  13. // 声明(创建)队列
  14. /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */
  15. channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, false, false, null);
  16. channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_COMMON , true, false, false, null);
  17. //队列绑定交换机 和指定 Routingkey
  18. channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER);
  19. channel.queueBind(ConnectionUtil.DIRECT_QUEUE_COMMON , ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);
  20. // 发送信息
  21. String message = "新增了会员商品。路由模式;routing key 为 "+ConnectionUtil.DIRECT_ROUTING_MEMBER ;
  22. /** * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */
  23. channel.basicPublish(ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER, null, message.getBytes());
  24. System.out.println("已发送消息:" + message);
  25. // 发送信息
  26. message = "新增了普通商品。路由模式;routing key 为 "+ConnectionUtil.DIRECT_ROUTING_COMMON ;
  27. /** * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */
  28. channel.basicPublish(ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON, null, message.getBytes());
  29. System.out.println("已发送消息:" + message);
  30. // 关闭资源
  31. channel.close();
  32. connection.close();
  33. }
  34. }

会员消费者

  1. package com.itheima.rabbitmq.simple.direct;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. public class Consumer1 {//会员消费者
  5. public static void main(String[] args) throws Exception {
  6. Connection connection = ConnectionUtil.getConnection();
  7. // 创建频道
  8. Channel channel = connection.createChannel();
  9. //创建交换机
  10. channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
  11. // 声明(创建)队列
  12. /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */
  13. channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, false, false, null);
  14. //队列绑定交换机 和指定 Routingkey
  15. channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER);//会员Routing
  16. channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);//普通Routing
  17. //创建消费者;并设置消息处理
  18. DefaultConsumer consumer = new DefaultConsumer(channel){
  19. @Override
  20. /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */
  21. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  22. //路由key
  23. System.out.println("路由key为:" + envelope.getRoutingKey());
  24. //交换机
  25. System.out.println("交换机为:" + envelope.getExchange());
  26. //消息id
  27. System.out.println("消息id为:" + envelope.getDeliveryTag());
  28. //收到的消息
  29. System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
  30. }
  31. };
  32. //监听消息
  33. /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */
  34. channel.basicConsume(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, consumer);
  35. }
  36. }

普通消费者

  1. package com.itheima.rabbitmq.simple.direct;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. public class Consumer2 {//普通消费者
  5. public static void main(String[] args) throws Exception {
  6. Connection connection = ConnectionUtil.getConnection();
  7. // 创建频道
  8. Channel channel = connection.createChannel();
  9. //创建交换机
  10. channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
  11. // 声明(创建)队列
  12. /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */
  13. channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_COMMON , true, false, false, null);
  14. //队列绑定交换机 和指定 Routingkey
  15. channel.queueBind(ConnectionUtil.DIRECT_QUEUE_COMMON , ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);//普通Routing
  16. //创建消费者;并设置消息处理
  17. DefaultConsumer consumer = new DefaultConsumer(channel){
  18. @Override
  19. /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. //路由key
  22. System.out.println("路由key为:" + envelope.getRoutingKey());
  23. //交换机
  24. System.out.println("交换机为:" + envelope.getExchange());
  25. //消息id
  26. System.out.println("消息id为:" + envelope.getDeliveryTag());
  27. //收到的消息
  28. System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
  29. }
  30. };
  31. //监听消息
  32. /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */
  33. channel.basicConsume(ConnectionUtil.DIRECT_QUEUE_COMMON , true, consumer);
  34. }
  35. }

先运行 所有消费者 然后 在运行生产者

控制台结果:

会员

普通

大家可能会发现 在Producer类中和Consumer1已经Consumer2 中都存在重复创建队列和交换机代码

channel.exchangeDeclare(xxx) 创建交换机
channel.queueBind(xxx) 创建队列

那么能不能省略呢 答案是能 但是这样的话 你必须保证 发送消息的目标队列 必须存在 否则 消息将丢失

所以还是不要省略为好 这样在发送消息之前就将需要的队列创建完成了 这样保证了消息不会丢失

Topics通配符模式

Topics通配符模式是在RoutingKey路由模式的基础上升级的 增加了通配符功能

在RoutingKey路由模式中我们要 分流发送信息 需要指定 每一个队列的RoutingKey 如果是100个队列呢

那不给累死…

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: group.member1

通配符规则:

/#:匹配一个或多个词 (最常用)

/*:匹配不多不少恰好1个词

举例:

group./#:能够匹配group.xxx 或者 group.xxx.xx

group./*:只能匹配group.xxx

我们将RoutingKey路由模式的代码改动下:

将ConnectionUtil类中的 路由变量换成以下代码

  1. //会员路由
  2. static final String DIRECT_ROUTING_MEMBER = "item.member";
  3. //普通路由
  4. static final String DIRECT_ROUTING_COMMON = "item.common";

将 Consumer1 这个类的代码换成下面代码

Consumer1

  1. package com.itheima.rabbitmq.simple.direct;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. public class Consumer1 {//会员消费者
  5. public static void main(String[] args) throws Exception {
  6. Connection connection = ConnectionUtil.getConnection();
  7. // 创建频道
  8. Channel channel = connection.createChannel();
  9. //声明交换机
  10. channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
  11. // 声明(创建)队列
  12. /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */
  13. channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, false, false, null);
  14. //队列绑定交换机 和指定Routing
  15. channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, "item.*");//会员Routing
  16. //创建消费者;并设置消息处理
  17. DefaultConsumer consumer = new DefaultConsumer(channel){
  18. @Override
  19. /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. //路由key
  22. System.out.println("路由key为:" + envelope.getRoutingKey());
  23. //交换机
  24. System.out.println("交换机为:" + envelope.getExchange());
  25. //消息id
  26. System.out.println("消息id为:" + envelope.getDeliveryTag());
  27. //收到的消息
  28. System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
  29. }
  30. };
  31. //监听消息
  32. /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */
  33. channel.basicConsume(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, consumer);
  34. }
  35. }

其他代码都一样 我就是把多个channel.queueBind 删除 只留一个 然后把 第三个参数变成统配符

效果等同于上面案例 , 这里就不截图了

相关文章