RabbitMQ05_订阅直连(Direct)模型

x33g5p2x  于2021-12-19 转载在 其他  
字(2.3k)|赞(0)|评价(0)|浏览(420)

RabbitMQ05_订阅直连(Direct)模型

fanout 广播模型实现了一条消息被所有订阅的队列消费
但在某些场景下,我们希望不同的消息被不同的队列消费

在 Direct 模型中,队列与交换机绑定时需指定一个 RoutingKey,消息发送者向交换机发送消息时需指定消息的 RoutingKey,交换机将根据消息和队列的 RoutingKey 匹配结果决定发送给哪些队列

代码示例:

  • 消息生产者:
public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //通过通道声明交换机
    //参数1:交换机名称(自定义) 参数2:路由(直连)模式
    channel.exchangeDeclare("logs_Direct", "direct");
    //发送消息
    String routingkey = "info";
    channel.basicPublish("logs_Direct", routingkey, null, ("这是Direct模型发布的基于route key:["+routingkey+"]的消息").getBytes());
    RabbitMQUtils.closeConnectionAndChannel(channel, connection);
}
  • 消息消费者1(只接收 routingkey 为 error 的消息):
public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("logs_Direct", "direct");
    //创建临时队列
    String queue = channel.queueDeclare().getQueue();
    //基于路由key绑定队列和交换机
    channel.queueBind(queue, "logs_Direct", "error");
    channel.basicConsume(queue, true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {
            System.out.println("消费者1:"+new String(body));
        }
    });

}
  • 消息消费者2(接收 routingkey 为 info、error、warning 的消息):
public static void main(String[] args) throws IOException {
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("logs_Direct", "direct");
    //创建临时队列
    String queue = channel.queueDeclare().getQueue();
    //基于路由key绑定队列和交换机
    channel.queueBind(queue, "logs_Direct", "info");
	channel.queueBind(queue, "logs_Direct", "error");
	channel.queueBind(queue, "logs_Direct", "warning");
    channel.basicConsume(queue, true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {
            System.out.println("消费者1:"+new String(body));
        }
    });

}

测试让消息生产者分别生产一个 info、error、warning 消息,结果:

消费者1只接收到 error 消息,而消费者2接收到了三个消息:

消费者1:这是Direct模型发布的基于route key:[error]的消息
消费者2:这是Direct模型发布的基于route key:[info]的消息
消费者2:这是Direct模型发布的基于route key:[error]的消息
消费者2:这是Direct模型发布的基于route key:[warning]的消息
上一篇:fanout广播模型
下一篇:Topic模型

相关文章