文章11 | 阅读 6387 | 点赞0
楔子:在信息高速公路上,我们开着大大小小的车辆,我们或快或慢高速飞驰,东南西北,日月星辰,我们要经过收费站服务区,我们要选择缴费窗口并减速排队缴费才能顺利通过。MQ的顺序消息也是这样。
也许我们经常有这样的生活经验:
顺序消息(First Input First Output,FIFO 消息)是消息队列 MQ 提供的一种严格按照顺序来发布和消费的消息**。**顺序发布和顺序消费是指对于指定的一个 Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。
顺序消息分为全局顺序消息和分区顺序消息。
在默认的情况下消息发送会采取 Round Robin 轮询方式把消息发送到不同的 queue(分区队列);而消费消息的时候从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。
当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue,消息都是有序的。
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
**适用场景:**适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
**示例:**在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。
对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding Key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
**适用场景:**适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照 FIFO 原则进行消息发布和消费的场景。
**示例:**电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个 OrderId 获取到的肯定是同一个队列。
分区顺序消息生产者(Producer )
package com.meiwei.service.mq.tcp.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* 顺序消息 - 生产者
*
* 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
*/
public class OrderMqProducer {
// Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
// Topic 名称长度不得超过 64 字符长度限制,否则会导致无法发送或者订阅
private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
// Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
private static final String MQ_CONFIG_TAG_A = "PID_MEIWEI_SMS_ORDER_A";
private static final String MQ_CONFIG_TAG_B = "PID_MEIWEI_SMS_ORDER_B";
private static final String MQ_CONFIG_TAG_C = "PID_MEIWEI_SMS_ORDER_C";
public static void main(String[] args) throws Exception {
// 声明并实例化一个 producer 生产者来产生消息
// 需要一个 producer group 名字作为构造方法的参数
DefaultMQProducer producer = new DefaultMQProducer("meiwei-producer-orderdmq");
// 指定 NameServer 地址列表,多个nameServer地址用半角分号隔开。此处应改为实际 NameServer 地址
// NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
producer.setNamesrvAddr("127.0.0.1:9876");
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
// 二级分类标签
String[] tags = new String[] {MQ_CONFIG_TAG_A, MQ_CONFIG_TAG_B, MQ_CONFIG_TAG_C};
List<OrderStep> orderList = new OrderMqProducer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < orderList.size(); i++) {
// 加个时间缀
String content = "【MQ测试消息】顺序消息, 时间 " + dateStr + " " + orderList.get(i);
// 新建一条消息,指定topic,tag、key和body
// KEY 就好比具体某个班级,唯一
Message message = new Message(MQ_CONFIG_TOPIC, tags[i % tags.length], "KEY" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 提交消息,制定 queue 选择器和排序参数
// 做了一个取模运算再丢到 selector 中,selector 保证同一个模的都会投递到同一条 queue
// 即:相同订单号的 有相同的模 有相同的 queue
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
// 根据订单id选择发送queue
Long id = (Long) o;
long index = id % list.size();
return list.get((int) index);
}
}, orderList.get(i).getOrderId()); // 订单id
// 日志打印
System.out.printf("Send MQ message success! Topic: %s,Tag: %s, Message: %s %n",
message.getTopic(), message.getTags(), new String(message.getBody()));
}
// 在发送完消息之后,销毁 Producer 对象。如果不销毁也没有问题
producer.shutdown();
}
/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
Producer 生产端确保消息顺序,唯一要做的事情就是将消息路由到特定的分区。在 RocketMQ 中,通过 MessageQueueSelector 来实现分区的选择。
分区顺序消息消费者(Consumer )【Push模式】
package com.meiwei.service.mq.tcp.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 顺序消息 - 消费者
*
* 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
*/
public class OrderMqConsumer {
// Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
// Topic 名称长度不得超过 64 字符长度限制,否则会导致无法发送或者订阅
private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
// Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
private static final String MQ_CONFIG_TAG_A = "PID_MEIWEI_SMS_ORDER_A";
private static final String MQ_CONFIG_TAG_B = "PID_MEIWEI_SMS_ORDER_B";
private static final String MQ_CONFIG_TAG_C = "PID_MEIWEI_SMS_ORDER_C";
public static void main(String[] args) throws Exception {
// 声明并初始化一个 consumer
// 需要一个 consumer group 名字作为构造方法的参数
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("meiwei-consumer-ordermq");
// 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
consumer.setNamesrvAddr("127.0.0.1:9876");
// 这里设置的是一个consumer的消费策略
// CONSUME_FROM_LAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息
// CONSUME_FROM_FIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
// CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,和setConsumeTimestamp() 配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_A + " || " + MQ_CONFIG_TAG_B + " || " + MQ_CONFIG_TAG_C);
// 设置一个Listener,主要进行消息的逻辑处理
// 注意这里使用的是 MessageListenerOrderly 这个接口来实现顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
// 设置自动提交
consumeOrderlyContext.setAutoCommit(true);
list.forEach(mq->{
System.out.printf("Thread: %s, Topic: %s, Tags: %s, Message: %s",
Thread.currentThread().getName(),
mq.getTopic(),
mq.getTags(),
new String(mq.getBody()));
System.out.println();
});
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
// 返回消费状态
// SUCCESS 消费成功
// SUSPEND_CURRENT_QUEUE_A_MOMENT 消费失败,暂停当前队列的消费
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 调用 start() 方法启动 consumer
consumer.start();
System.out.println("Consumer Started.");
}
}
分区顺序消息生产者(Producer)发送结果:
分区顺序消息消费者(Consumer)消费结果:
顺序消息缺陷
消息类型对比
Topic 的消息类型 | 是否支持事务消息 | 是否支持定时/延时消息 | 性能 |
---|---|---|---|
无序消息(普通、事务、定时/延时消息) | 是 | 是 | 最高 |
分区顺序消息 | 否 | 否 | 高 |
全局顺序消息 | 否 | 否 | 一般 |
发送方式对比
消息类型 | 是否支持可靠同步发送 | 是否支持可靠异步发送 | 是否支持 Oneway 发送 |
---|---|---|---|
无序消息(普通、事务、定时/延时消息) | 是 | 是 | 是 |
分区顺序消息 | 是 | 否 | 否 |
全局顺序消息 | 是 | 否 | 否 |
参考资料
阿里云:https://help.aliyun.com/document_detail/49319.html?spm=a2c4g.11186623.6.553.5dd918fdfmtTSh
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/itanping/article/details/101070072
内容来源于网络,如有侵权,请联系作者删除!