文章11 | 阅读 6386 | 点赞0
楔子:翻了帖子两三天,硬是没有找到哪个帖子能证明生产端的消息重试是确实重试了的。大多要么是对概念、源码说明了一下,或者把实现示例贴贴,但基本并没有有效测试证明。想了想,还是自己来捋一捋这 RocketMQ 的消息重试机制。
由于 MQ 经常处于庞大的分布式系统中,考虑到网络波动、服务宕机、程序异常等因素,很可能会出现消息发送或者消费失败的问题。因此,如果没有消息重试,就有可能造成消息丢失,最终影响到系统某些业务或流程。所以,大部分消息中间件都对消息重试提供了很好的支持。RocketMQ 消息重试分为两种:**Producer 发送重试 **和 Consumer 消费重试。
**也叫消息重投。**一般由于网络抖动等原因,Producer 向 Broker 发送消息时没有成功,导致最终 Consumer 无法消费消息,此时 RocketMQ 会自动进行消息重试/重投。我们可以手动设置发送失败时的重试次数。默认为 2 次,但加上程序本身的 1 次发送,如果失败,总共会发送 3 次,也就是 N + 1 次。N 为 retryTimesWhenSendFailed。
验证前,我们先来撸一下源码:
private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// ...源码省略...
// 获取当前时间
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
// 去服务器看下有没有主题消息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if(topicPublishInfo != null && topicPublishInfo.ok()) {
// ...源码省略...
// 通过这里可以很明显看出,如果是同步消息,则重试 变量值+1次;如果不是同步发送消息,那么消息重试只有1次
int timesTotal = communicationMode == CommunicationMode.SYNC?1 + this.defaultMQProducer.getRetryTimesWhenSendFailed():1;
// 重试累计次数
int times = 0;
String[] brokersSent = new String[timesTotal];
while(true) {
label129: {
String info;
// 如果重试累计次数 小于 总重试次数阀值,则轮询获取服务器主题消息
if(times < timesTotal) {
info = null == mq?null:mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
if(mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mqSelected.getBrokerName();
long endTimestamp;
try {
beginTimestampPrev = System.currentTimeMillis();
if(times > 0) {
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
// 消息投送耗时
long costTime = beginTimestampPrev - beginTimestampFirst;
// 如果 消息投送耗时 小于等于 超时时间,则向 Broker 进行消息重投;否则,超时
if(timeout >= costTime) {
// 调用sendKernelImpl开始发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
// ...源码省略...
default:
break label129;
}
}
// 设置超时
callTimeout = true;
} catch (RemotingException var26) {
// ...源码省略...
// 当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投
break label129;
} catch (MQClientException var27) {
// ...源码省略...
// 当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投
break label129;
} catch (MQBrokerException var28) {
// ...源码省略...
// 当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投
switch(var28.getResponseCode()) {
case 1:
case 14:
case 16:
case 17:
case 204:
case 205:
break label129;
default:
if(sendResult != null) {
return sendResult;
} else {
throw var28;
}
}
} catch (InterruptedException var29) {
// 源码省略......
throw var29;
}
}
}
if(sendResult != null) {
return sendResult;
}
// 重试日志
info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", new Object[]{Integer.valueOf(times), Long.valueOf(System.currentTimeMillis() - beginTimestampFirst), msg.getTopic(), Arrays.toString(brokersSent)});
info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
// 如果是消息发送/重试/重投超时,则抛出异常。如果还有重试次数,该异常不会再对该消息进行重试
if(callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
// ...源码省略...
// 默认走 MQClientException 异常
throw mqClientException;
}
// 重试次数累加
++times;
}
} else {
// 如果没有可用 Topic,且 NamesrvAddr 地址列表不为空,则构建 MQClientException,没有重试
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if(null != nsList && !nsList.isEmpty()) {
throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
} else {
// 如果没有可用 Topic,且 NamesrvAddr 地址列表为空,则构建 MQClientException,没有重试
throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);
}
}
}
从 DefaultMQProducer 源码分析可以看出:
生产者重试几次?
什么时候重试?
怎么重试?
每次重试都会重新进行负载均衡(会考虑发送失败的因素),使用 selectOneMessageQueue 重新选择 MessageQueue,这样增大发送消息成功的可能性。
隔多久重试?
立即重试,中间没有单独的间隔时间。见源码 真死循环 while(true) 的 sendDefaultImpl 方法,里面有个 label129 标记,只要上一次发送消息后被标记为 label129,就会立马进行下一次消息重投,没有时间间隔。
配置一个不存在的 nameServer 地址,实现一个设置重试次数 retryTimesWhenSendFailed 为 2,但总共会重试/重投 3 次(因为 N + 1)的消息生产者:
public class RetryMultiMqProducer {
// Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
// Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
private static final String MQ_CONFIG_TAG_RETRY = "PID_MEIWEI_SMS_RETRY_PRODUCER";
public static void main(String[] args) throws Exception {
// 创建一个 producer 生产者
DefaultMQProducer producer = new DefaultMQProducer("meiwei-producer-retry");
// 指定 NameServer 地址列表,多个 nameServer 地址用半角分号隔开。此处应改为实际 NameServer 地址
// NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877");
// 设置重试次数,默认情况下是2次重试
// 虽然默认2次,但算上程序本身的1次,其实有3次机会,即如果本身的1次发送成功,2次重试机制就不重试了;如果本身的1次发送失败,则再执行这2次重试机会
producer.setRetryTimesWhenSendFailed(2);
// 设置超时时长,默认情况下是3000毫秒,即3秒
producer.setSendMsgTimeout(1000);
// 在发送MQ消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
// 循环发送MQ测试消息
String content = "";
for (int i = 0; i < 5; i++) {
try {
content = "【MQ测试消息】测试消息 " + i;
// 构建一条消息
Message message = new Message(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_RETRY, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息,发送消息到一个 Broker。默认以同步方式发送
SendResult sendResult = producer.send(message);
// 消息发送成功
System.out.printf("Send MQ message success! Topic: %s, Tag: %s, MsgId: %s, Message: %s %n",
message.getTopic(), message.getTags(), sendResult.getMsgId(), new String(message.getBody()));
} catch (Exception e) {
// 只有当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投
System.out.printf(new Date() + ", 异常信息:%s %n", e);
Thread.sleep(1000);
}
}
// 在发送完消息之后,销毁 Producer 对象。如果不销毁也没有问题
producer.shutdown();
}
}
正常开启 RocketMQ 服务,启动生产者:
从生产者输出的日志可以看到,后面的 4 条消息各重试了 3 次:
org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [1]ms, Topic: TOPIC_MEIWEI_SMS_NOTICE_TEST, BrokersSent: [YYW-SH-PC-1454, YYW-SH-PC-1454, YYW-SH-PC-1454]
再看看 RocketMQ 日志。如果是 Windows 安装的 RocketMQ,且使用的是默认日志配置,则可以在路径 C:\Users\yourname\logs\rocketmqlogs 下查看 rocketmq_client.log
日志文件 rocketmq_client.log 输出了源码中的日志:sendKernelImpl exception, resend at once, InvokeID
Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:
**只有在消息模式为 MessageModel.CLUSTERING 集群模式时,Broker 才会自动进行重试,广播消息模式下不会自动进行重试。**消费者消费消息后,需要给 Broker 返回消费状态。以 MessageListenerConcurrently 监听器为例,Consumer 消费完成后需要返回 ConsumeConcurrentlyStatus 消费状态。
RocketMQ 会为每个消费组都设置一个 Topic 名称为 “%RETRY%+consumerGroup” 的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为 “SCHEDULE_TOPIC_XXXX” 的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至 “%RETRY%+consumerGroup” 的重试队列中。
ConsumeConcurrentlyStatus 有 消费成功 和 消费失败 两种状态:
public enum ConsumeConcurrentlyStatus {
// 消费成功
CONSUME_SUCCESS,
// 消费失败,需要稍后重新消费
RECONSUME_LATER;
private ConsumeConcurrentlyStatus() {
}
}
Consumer 端的重试包括两种情况:
因此,如果 Consumer 端正常消费成功,一定要返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态。
public class RetryExceptionMqProducer {
private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
private static final String MQ_CONFIG_TAG_RETRY = "PID_MEIWEI_SMS_RETRY_EXCEPTION";
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("meiwei-producer-retry");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String content = "【MQ测试消息】测试消息 ";
Message message = new Message(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_RETRY, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息,发送消息到一个 Broker。默认以同步方式发送
SendResult sendResult = producer.send(message);
System.out.printf("Send MQ message success! Topic: %s, Tag: %s, MsgId: %s, Message: %s %n",
message.getTopic(), message.getTags(), sendResult.getMsgId(), new String(message.getBody()));
producer.shutdown();
}
}
实现一个设置了最大重试次数 maxReconsumeTimes 为 4,但业务异常中有重试阀值 3,满足阀值条件,则返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 不再重试的消费者:
public class Retry4ExceptionMqConsumer {
// Message 所属的 Topic 一级分类,须要与提供者的频道保持一致才能消费到消息内容
private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_RETRY_EXCEPTION";
public static void main(String[] args) throws Exception {
// 声明并初始化一个 consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("meiwei-consumer-retry-exception");
// 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
consumer.setNamesrvAddr("127.0.0.1:9876");
// 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH);
// 设置最大重试数次
consumer.setMaxReconsumeTimes(4);
// 注册一个监听器,主要进行消息消费的逻辑处理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 获取消息
MessageExt msg = list.get(0);
try {
// 获取重试次数
int reconsumeTimes = msg.getReconsumeTimes() + 1;
System.out.printf(new Date() + ",第 %s 次轮询消费 %n", reconsumeTimes);
// 模拟业务逻辑。此处为超过最大重试次数,自动标记消息消费成功
if (reconsumeTimes >= 3) {
System.out.printf(new Date() + ",超过最大重试次数,自动标记消息消费成功 Topic: %s, Tags: %s, Message: %s %n",
msg.getTopic(), msg.getTags(), new String(msg.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 模拟异常发生
int num = 1 / 0;
System.out.printf(new Date() + ",第 %s 次正常消费 %n", reconsumeTimes);
// 返回消费状态
// CONSUME_SUCCESS 消费成功
// RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 获取重试次数
int reconsumeTimes = msg.getReconsumeTimes() + 1;
System.out.printf(new Date() + ",第 %s 次重试消费,异常信息:%s %n", reconsumeTimes, e);
// 每次重试时间间隔遵循延时等级递增:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
// 调用 start() 方法启动 consumer
consumer.start();
System.out.println("Retry Consumer Started.");
}
}
这里的超时重试并非真正意义上的超时,它是说获取消息后,因为某种原因没有给 RocketMQ 返回消费的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS 或 return ConsumeConcurrentlyStatus.RECONSUME_LATER。这种情况 MQ 会无限制的发送消息给消费端,因为 RocketMQ 会认为该消息没有发送,所以会一直发送。
public class Retry4TimeoutMqConsumer {
// Message 所属的 Topic 一级分类,须要与提供者的频道保持一致才能消费到消息内容
private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_RETRY_TIMEOUT";
public static void main(String[] args) throws Exception {
// 创建一个 consumer 消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("meiwei-consumer-retry-timeout");
// 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
consumer.setNamesrvAddr("127.0.0.1:9876");
// 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH);
// 设置消费超时时间(默认值15L,为15分钟)
consumer.setConsumeTimeout(1L);
// 设置最大重试数次
consumer.setMaxReconsumeTimes(2);
// 注册一个监听器,主要进行消息消费的逻辑处理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 获取消息
MessageExt msg = list.get(0);
try {
// 获取重试次数
int reconsumeTimes = msg.getReconsumeTimes() + 1;
if (reconsumeTimes == 1) {
// 模拟操作:设置一个大于上面已经设置的消费超时时间 来验证超时重试场景(setConsumeTimeout(1L))
System.out.println("---------- 服务暂停 ---------- " + new Date());
Thread.sleep(1000 * 60 * 2);
} else {
System.out.println("---------- 重试消费 ---------- " + new Date());
}
System.out.printf(new Date() + " 第 %s 次重试消费:Topic: %s, Tags: %s, MsgId: %s, Message: %s %n",
reconsumeTimes, msg.getTopic(), msg.getTags(), msg.getMsgId(), new String(msg.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
System.out.printf(new Date() + ",异常信息:%s %n", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
// 调用 start() 方法启动 consumer
consumer.start();
System.out.println("Retry Timeout Consumer Started.");
}
}
测试期间,当控制台输出日志 “服务暂停” 后,关闭当前消费者:
再次开启该消费者:
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/itanping/article/details/102508701
内容来源于网络,如有侵权,请联系作者删除!