RabbitMQ05_订阅直连(Direct)模型

x33g5p2x  于2021-12-19 转载在 其他  
字(2.3k)|赞(0)|评价(0)|浏览(530)

RabbitMQ05_订阅直连(Direct)模型

fanout 广播模型实现了一条消息被所有订阅的队列消费
但在某些场景下,我们希望不同的消息被不同的队列消费

在 Direct 模型中,队列与交换机绑定时需指定一个 RoutingKey,消息发送者向交换机发送消息时需指定消息的 RoutingKey,交换机将根据消息和队列的 RoutingKey 匹配结果决定发送给哪些队列

代码示例:

  • 消息生产者:
  1. public static void main(String[] args) throws IOException {
  2. Connection connection = RabbitMQUtils.getConnection();
  3. Channel channel = connection.createChannel();
  4. //通过通道声明交换机
  5. //参数1:交换机名称(自定义) 参数2:路由(直连)模式
  6. channel.exchangeDeclare("logs_Direct", "direct");
  7. //发送消息
  8. String routingkey = "info";
  9. channel.basicPublish("logs_Direct", routingkey, null, ("这是Direct模型发布的基于route key:["+routingkey+"]的消息").getBytes());
  10. RabbitMQUtils.closeConnectionAndChannel(channel, connection);
  11. }
  • 消息消费者1(只接收 routingkey 为 error 的消息):
  1. public static void main(String[] args) throws IOException {
  2. Connection connection = RabbitMQUtils.getConnection();
  3. Channel channel = connection.createChannel();
  4. channel.exchangeDeclare("logs_Direct", "direct");
  5. //创建临时队列
  6. String queue = channel.queueDeclare().getQueue();
  7. //基于路由key绑定队列和交换机
  8. channel.queueBind(queue, "logs_Direct", "error");
  9. channel.basicConsume(queue, true, new DefaultConsumer(channel){
  10. @Override
  11. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
  12. throws IOException {
  13. System.out.println("消费者1:"+new String(body));
  14. }
  15. });
  16. }
  • 消息消费者2(接收 routingkey 为 info、error、warning 的消息):
  1. public static void main(String[] args) throws IOException {
  2. Connection connection = RabbitMQUtils.getConnection();
  3. Channel channel = connection.createChannel();
  4. channel.exchangeDeclare("logs_Direct", "direct");
  5. //创建临时队列
  6. String queue = channel.queueDeclare().getQueue();
  7. //基于路由key绑定队列和交换机
  8. channel.queueBind(queue, "logs_Direct", "info");
  9. channel.queueBind(queue, "logs_Direct", "error");
  10. channel.queueBind(queue, "logs_Direct", "warning");
  11. channel.basicConsume(queue, true, new DefaultConsumer(channel){
  12. @Override
  13. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
  14. throws IOException {
  15. System.out.println("消费者1:"+new String(body));
  16. }
  17. });
  18. }

测试让消息生产者分别生产一个 info、error、warning 消息,结果:

消费者1只接收到 error 消息,而消费者2接收到了三个消息:

  1. 消费者1:这是Direct模型发布的基于route key:[error]的消息
  1. 消费者2:这是Direct模型发布的基于route key:[info]的消息
  2. 消费者2:这是Direct模型发布的基于route key:[error]的消息
  3. 消费者2:这是Direct模型发布的基于route key:[warning]的消息
上一篇:fanout广播模型
下一篇:Topic模型

相关文章