万字 +图片解析死信队列和死信实战演练

x33g5p2x  于2021-12-28 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(463)

📒博客首页:崇尚学技术的科班人
🏇小肖来了
🍣今天给大家带来的文章是《万字 +图片解析死信队列和死信实战演练》🍣
🍣有的小伙伴可能会问死信队列有啥用?你看了这篇文章就知道了🍣
🍣希望各位小伙伴们能够耐心的读完这篇文章🍣
🙏博主也在学习阶段,如若发现问题,请告知,非常感谢🙏
💗同时也非常感谢各位小伙伴们的支持💗

1、死信队列

1.1、概念

  • 死信:就是无法被消费的消息。由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
  • 应用场景:保证订单业务的消息数据不丢失,当消息发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

1.2、死信来源

  1. 消息TTL过期
  2. 队列达到最大长度(队列满了,无法再添加数据到队列中)。
  3. 消息被拒绝并且requeue = false

1.3、死信实战

1.3.1、代码架构图

1.3.2、TTL过期情况

1. 消费者01

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer01 {
    public static final String DEAD_EXCHANGE = "dead_exchange";

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static final String DEAD_QUEUE = "dead_queue";

    public static final String NORMAL_QUEUE = "normal_queue";

    /** * 死信实战 * 消费者01 * @param args * @throws Exception */

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        map.put("x-dead-letter-routing-key","lisi");
        map.put("x-message-ttl",10000);

        // 普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        // 队列绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});

    }
}
  • 最为复杂的就是消费者01,它需要进行 死信交换机绑定死信队列普通交换机绑定普通队列普通队列绑定死信交换机
  • 我们为了让消息不被消费,我们需要制造假死现象,也就是关闭消费者01

2. 消费者02

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer02 {
    /** * 消费者02 */

    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        // 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});

    }
}

3. 生产者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.nio.charset.StandardCharsets;

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();

        // 单位是毫秒
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for(int i = 1; i < 11; i ++){
            String message = "info" + i;
            // 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
    }
}

4. 测试结果

  • 所有的消息在超过过期时间之后,全部转移到了死信队列中

1.3.3、队列达到最大长度情况

1. 消费者01

public class Consumer01 {
    public static final String DEAD_EXCHANGE = "dead_exchange";

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static final String DEAD_QUEUE = "dead_queue";

    public static final String NORMAL_QUEUE = "normal_queue";

    /** * 死信实战 * 消费者01 * @param args * @throws Exception */

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        map.put("x-dead-letter-routing-key","lisi");
        map.put("x-max-length",6);
        //map.put("x-message-ttl",10000);

        // 普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        // 队列绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});

    }
}
  • 这里我们将过期时间参数改为了队列最大长度
  • 我们为了让消息不被消费和观察到明显现象,我们需要制造假死现象,也就是关闭消费者01

2. 消费者02

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer02 {
    /** * 消费者02 */

    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        // 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});

    }
}
  • 消费者02和TTL过期情况下的一模一样

3. 生产者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.nio.charset.StandardCharsets;

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();

        // 单位是毫秒
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for(int i = 1; i < 11; i ++){
            String message = "info" + i;
            // 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
    }
}
  • 我们将对应的设置过期时间注释掉

4. 测试结果

  • 如果我们启动消费者01会报错,那是因为我们所创建的队列已经存在,我们需要把普通队列删除,因为只有它的参数发生了改变

  • 因为我们设置了普通队列的最大长度6,所以当超过了最大长度的消息都会被作为死信

1.3.4、消息被拒情况

1. 消费者01

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer01 {
    public static final String DEAD_EXCHANGE = "dead_exchange";

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static final String DEAD_QUEUE = "dead_queue";

    public static final String NORMAL_QUEUE = "normal_queue";

    /** * 死信实战 * 消费者01 * @param args * @throws Exception */

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        map.put("x-dead-letter-routing-key","lisi");
        //map.put("x-max-length",6);
        //map.put("x-message-ttl",10000);

        // 普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        // 队列绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            String msg = new String(var2.getBody(),"UTF-8");
            if(msg.equals("info5")){
                System.out.println("Consumer01控制台接收到的消息是:" + msg + ": 此消息被拒" );
                channel.basicReject(var2.getEnvelope().getDeliveryTag(),false);
            }else{
                System.out.println("Consumer01控制台接收到的消息是:" + msg);
                channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
            }

        };
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,var1->{});

    }
}
  • 这里我们将队列最大长度注释掉
  • 我们还需要开启手动应答,因为不开启就不会存在消息被拒 的问题。

2. 消费者02

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class Consumer02 {
    /** * 消费者02 */

    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
        };
        // 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});

    }
}
  • 消费者02和队列达到最大长度情况下的一模一样

3. 生产者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.nio.charset.StandardCharsets;

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();

        // 单位是毫秒
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for(int i = 1; i < 11; i ++){
            String message = "info" + i;
            // 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
    }
}
  • 生产者和队列达到最大长度情况下的一模一样

4. 测试结果

  • 测试之前我们需要将队列中的消息消费掉,并且需要将普通队列删除。

  • 可见只有"info5"被作为死信。

2、总结

  • 如果觉得这篇文章对你有帮助的话,请给我一个五星好评呗。评论地址,感谢铁汁的支持!!!

相关文章