RocketMQ进击(三)顺序消息与高速公路收费站

x33g5p2x  于2021-12-19 转载在 其他  
字(8.7k)|赞(0)|评价(0)|浏览(358)

楔子:在信息高速公路上,我们开着大大小小的车辆,我们或快或慢高速飞驰,东南西北,日月星辰,我们要经过收费站服务区,我们要选择缴费窗口并减速排队缴费才能顺利通过。MQ的顺序消息也是这样。

1. 日常排队经验

也许我们经常有这样的生活经验:

  1. 在大型超市购物结算时,你最终只能在一个结算口进行排队结算,即先进先出(这里排除插队搞事情现象)
  2. 在高速上过收费站时,在同一窗口,先进队的车一定是先缴完费出去,即先进先出(这里排除插队搞事情现象)
  3. 在机场出关时,你只能在一个队列,你也会比在你后面的人先过安检,即先进先出(这里排除插队搞事情现象)

顺序消息(First Input First Output,FIFO 消息)是消息队列 MQ 提供的一种严格按照顺序来发布和消费的消息**。**顺序发布和顺序消费是指对于指定的一个 Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。

2. 浅探顺序消息

顺序消息分为全局顺序消息分区顺序消息

在默认的情况下消息发送会采取 Round Robin 轮询方式把消息发送到不同的 queue(分区队列);而消费消息的时候从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。

当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue,消息都是有序的。

2.1. 全局顺序消息

对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。

**适用场景:**适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。

**示例:**在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。

2.2. 分区顺序消息

对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding Key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。

**适用场景:**适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照 FIFO 原则进行消息发布和消费的场景。

**示例:**电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

2.3. 源码与案例

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个 OrderId 获取到的肯定是同一个队列。

分区顺序消息生产者(Producer )

package com.meiwei.service.mq.tcp.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * 顺序消息 - 生产者
 *
 * 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
 */
public class OrderMqProducer {

    // Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
    // Topic 名称长度不得超过 64 字符长度限制,否则会导致无法发送或者订阅
    private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";

    // Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
    private static final String MQ_CONFIG_TAG_A = "PID_MEIWEI_SMS_ORDER_A";
    private static final String MQ_CONFIG_TAG_B = "PID_MEIWEI_SMS_ORDER_B";
    private static final String MQ_CONFIG_TAG_C = "PID_MEIWEI_SMS_ORDER_C";

    public static void main(String[] args) throws Exception {
        // 声明并实例化一个 producer 生产者来产生消息
        // 需要一个 producer group 名字作为构造方法的参数
        DefaultMQProducer producer = new DefaultMQProducer("meiwei-producer-orderdmq");

        // 指定 NameServer 地址列表,多个nameServer地址用半角分号隔开。此处应改为实际 NameServer 地址
        // NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
        producer.start();

        // 二级分类标签
        String[] tags = new String[] {MQ_CONFIG_TAG_A, MQ_CONFIG_TAG_B, MQ_CONFIG_TAG_C};

        List<OrderStep> orderList = new OrderMqProducer().buildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);

        for (int i = 0; i < orderList.size(); i++) {
            // 加个时间缀
            String content = "【MQ测试消息】顺序消息, 时间 " + dateStr + " " + orderList.get(i);

            // 新建一条消息,指定topic,tag、key和body
            // KEY 就好比具体某个班级,唯一
            Message message = new Message(MQ_CONFIG_TOPIC, tags[i % tags.length], "KEY" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET));

            // 提交消息,制定 queue 选择器和排序参数
            // 做了一个取模运算再丢到 selector 中,selector 保证同一个模的都会投递到同一条 queue
            // 即:相同订单号的 有相同的模 有相同的 queue
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    // 根据订单id选择发送queue
                    Long id = (Long) o;
                    long index = id % list.size();
                    return list.get((int) index);
                }
            }, orderList.get(i).getOrderId()); // 订单id

            // 日志打印
            System.out.printf("Send MQ message success! Topic: %s,Tag: %s, Message: %s %n",
                    message.getTopic(), message.getTags(), new String(message.getBody()));
        }

        // 在发送完消息之后,销毁 Producer 对象。如果不销毁也没有问题
        producer.shutdown();
    }

    /**
     * 订单的步骤
     */
    private static class OrderStep {
        private long orderId;
        private String desc;

        public long getOrderId() {
            return orderId;
        }

        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "OrderStep{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    /**
     * 生成模拟订单数据
     */
    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

Producer 生产端确保消息顺序,唯一要做的事情就是将消息路由到特定的分区。在 RocketMQ 中,通过 MessageQueueSelector 来实现分区的选择。

  • List<MessageQueue> list:消息要发送的 Topic 下所有的分区
  • Message message:消息对象
  • 额外参数:用户可以传递自己的参数

分区顺序消息消费者(Consumer )【Push模式】

package com.meiwei.service.mq.tcp.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 顺序消息 - 消费者
 *
 * 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
 */
public class OrderMqConsumer {

    // Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
    // Topic 名称长度不得超过 64 字符长度限制,否则会导致无法发送或者订阅
    private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";

    // Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
    private static final String MQ_CONFIG_TAG_A = "PID_MEIWEI_SMS_ORDER_A";
    private static final String MQ_CONFIG_TAG_B = "PID_MEIWEI_SMS_ORDER_B";
    private static final String MQ_CONFIG_TAG_C = "PID_MEIWEI_SMS_ORDER_C";

    public static void main(String[] args) throws Exception {
        // 声明并初始化一个 consumer
        // 需要一个 consumer group 名字作为构造方法的参数
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("meiwei-consumer-ordermq");

        // 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 这里设置的是一个consumer的消费策略
        // CONSUME_FROM_LAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息
        // CONSUME_FROM_FIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
        // CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,和setConsumeTimestamp() 配合使用,默认是半个小时以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
        consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_A + " || " + MQ_CONFIG_TAG_B + " || " + MQ_CONFIG_TAG_C);

        // 设置一个Listener,主要进行消息的逻辑处理
        // 注意这里使用的是 MessageListenerOrderly 这个接口来实现顺序消费
        consumer.registerMessageListener(new MessageListenerOrderly() {

            Random random = new Random();

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                // 设置自动提交
                consumeOrderlyContext.setAutoCommit(true);

                list.forEach(mq->{
                    System.out.printf("Thread: %s, Topic: %s, Tags: %s, Message: %s",
                            Thread.currentThread().getName(),
                            mq.getTopic(),
                            mq.getTags(),
                            new String(mq.getBody()));
                    System.out.println();
                });

                try {
                    //模拟业务逻辑处理中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }

                // 返回消费状态
                // SUCCESS 消费成功
                // SUSPEND_CURRENT_QUEUE_A_MOMENT 消费失败,暂停当前队列的消费
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        // 调用 start() 方法启动 consumer
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

2.4. 测试及结果

分区顺序消息生产者(Producer)发送结果:

分区顺序消息消费者(Consumer)消费结果:

顺序消息缺陷

  • 发送顺序消息无法利用集群 FailOver 特性
  • 消费顺序消息的并行度依赖于队列数量
  • 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题
  • 遇到消息失败的消息,无法跳过,当前队列消费暂停

2.5. 全局与分区对比

消息类型对比

Topic 的消息类型是否支持事务消息是否支持定时/延时消息性能
无序消息(普通、事务、定时/延时消息)最高
分区顺序消息
全局顺序消息一般

发送方式对比

消息类型是否支持可靠同步发送是否支持可靠异步发送是否支持 Oneway 发送
无序消息(普通、事务、定时/延时消息)
分区顺序消息
全局顺序消息

参考资料
阿里云:https://help.aliyun.com/document_detail/49319.html?spm=a2c4g.11186623.6.553.5dd918fdfmtTSh

相关文章