RocketMQ:Producer启动流程与消息发送源码分析

x33g5p2x  于2021-11-22 转载在 其他  
字(23.1k)|赞(0)|评价(0)|浏览(373)

Producer

在RocketMQ中,消息生产者就是客户端,即消息的提供者。

以下是消息生产者Producer项目预览图:

1.方法和属性

Producer的相关核心类:

MQAdmin接口方法介绍:

//创建主题
void createTopic(final String key, final String newTopic, final int queueNum)
    throws MQClientException;
//根据时间戳从队列中查找消息偏移量
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
//查找消息队列中最大/小偏移量
long maxOffset(final MessageQueue mq) throws MQClientException;
long minOffset(final MessageQueue mq) throws MQClientException;
//根据偏移量查找信息
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
    InterruptedException, MQClientException;
//根据条件查询消息
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
    final long end) throws MQClientException, InterruptedException;
//根据主题和消息ID查询消息
MessageExt viewMessage(String topic,
    String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

MQProducer接口方法介绍:

//启动
void start() throws MQClientException;
//关闭
void shutdown();
//查找该主题下的所有消息队列
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
//同步发送消息
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
    InterruptedException;
//同步超时发送消息
SendResult send(final Message msg, final long timeout) throws MQClientException,
        RemotingException, MQBrokerException, InterruptedException;
//异步发送消息
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
    RemotingException, InterruptedException;
//异步并附带超时时间的消息发送
void send(final Message msg, final SendCallback sendCallback, final long timeout)
    throws MQClientException, RemotingException, InterruptedException;
//发送单向消息-无需关注返回结果-void
void sendOneway(final Message msg) throws MQClientException, RemotingException,
    InterruptedException;
//同步并指定消息队列发送消息
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
    RemotingException, MQBrokerException, InterruptedException;
//选择指定队列异步发送消息
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
    throws MQClientException, RemotingException, InterruptedException;
//批量发送消息
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;

DefaultMQProducer:

属性介绍:

producerGroup:生产者所属组
createTopicKey:默认Topic
defaultTopicQueueNums:默认主题在每一个Broker队列数量
sendMsgTimeout:发送消息默认超时时间,默认3s
compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k
retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为2,总共执行3次
retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2
retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false
maxMessageSize:允许发送的最大消息长度,默认为4M

2.启动流程

启动时序图如下:

DefaultMQProducerImpl#start

switch (this.serviceState) {
    case CREATE_JUST:
        this.serviceState = ServiceState.START_FAILED;
	    //检查生产者组配置
        this.checkConfig();

        //生产组名=CLIENT_INNER_PRODUCER
        if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
            //将生产者实例名称改为PID
            this.defaultMQProducer.changeInstanceNameToPID();
        }

        //获得MQ客户端实例
        this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

        //注册生产者到MQClientInstance中并返回注册结果
        boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
        if (!registerOK) {
            this.serviceState = ServiceState.CREATE_JUST;
            throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                null);
        }

        //存入Topic主题发布信息
        this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

        //如果是初次启动
        if (startFactory) {
            //启动MQ客户端
            mQClientFactory.start();
        }

        log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
            this.defaultMQProducer.isSendMessageWithVIPChannel());
        //将服务状态改为RUNNING
        this.serviceState = ServiceState.RUNNING;
        break;

MQClientManager

//单例-一个JVM中只存在一个MQClientManager实例
private static MQClientManager instance = new MQClientManager();
//MQClientManager-维护一个MQClientInstance缓存表
//同一个clientId只会对应一个MQClientInstance
//MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>();

MQClientManager#getOrCreateMQClientInstance

//构建MQClientId
String clientId = clientConfig.buildMQClientId();
//在缓存表中查询是否存在instance
MQClientInstance instance = this.factoryTable.get(clientId);
//instance不存在
if (null == instance) {
    //构建instance
    instance =
        new MQClientInstance(clientConfig.cloneClientConfig(),
            this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
    //存入表中
    MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
    if (prev != null) {
        instance = prev;
        log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
    } else {
        log.info("Created new MQClientInstance for clientId:[{}]", clientId);
    }
}

return instance;

3.消息发送

消息发送时序图:

DefaultMQProducerImpl#send(Message msg)

/** * DEFAULT SYNC ------------------------------------------------------- */
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

DefaultMQProducer#send(Message msg,long timeout)

/** * * @param msg * @param timeout 默认超时时长为3s * @return 返回发送结果 */
public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

DefaultMQProducerImpl#sendDefaultImpl

3.1验证消息
//验证消息
Validators.checkMessage(msg, this.defaultMQProducer);

Validators#checkMessage

//检查是否为空
if (null == msg) {
    throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic-校验主题
Validators.checkTopic(msg.getTopic());
//是否是禁止发送的消息主题
Validators.isNotAllowedSendTopic(msg.getTopic());

// body-检查是否为空
if (null == msg.getBody()) {
    throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
    throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
//body-检查消息体是否大于消息最大限制大小
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
    throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
        "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
3.2查找路由

DefaultMQProducerImpl#tryToFindTopicPublishInfo

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

DefaultMQProducerImpl#tryToFindTopicPublishInfo

//在本地缓存中获取主题的路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//若路由信息为空 || !ok() -> !(null != this.messageQueueList && !this.messageQueueList.isEmpty())
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    //为主题创建路由信息-存入缓存表
    this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    //从nameServer中获取路由信息
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
//如果查询出的Info合法-返回Info
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    return topicPublishInfo;
} else {
    //否则将从nameServer获取的路由信息更新到缓存表中
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
    return topicPublishInfo;
}

TopicPublishInfo

public class TopicPublishInfo {
    private boolean orderTopic = false;			//是否是顺序消息
    private boolean haveTopicRouterInfo = false; //是否有路由信息
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();		//该主题的消息队列
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();			//每选择一次消息队列该值+1
    private TopicRouteData topicRouteData;		//关联Topic路由元信息
}

TopicRouteData

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;			//顺序消息配置
    private List<QueueData> queueDatas;		//Broker队列信息
    private List<BrokerData> brokerDatas;	//Broker信息
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;		//消息过滤表
}

MQClientInstance#updateTopicRouteInfoFromNameServer

//this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)-加锁
TopicRouteData topicRouteData;
//使用默认主题从NameServer获取路由信息
if (isDefault && defaultMQProducer != null) {
    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
        clientConfig.getMqClientApiTimeout());
    if (topicRouteData != null) {
        for (QueueData data : topicRouteData.getQueueDatas()) {
            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
            data.setReadQueueNums(queueNums);
            data.setWriteQueueNums(queueNums);
        }
    }
} else {
     //使用指定主题从NameServer获取路由信息
    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
if (changed) {
    //克隆出一份主题路由信息
    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
        this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
    }

    // Update Pub info
    {
        //将topicRouteData转化为TopicPublishInfo
        TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
        //是否有主题路由信息-设置为true
        publishInfo.setHaveTopicRouterInfo(true);
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        //遍历生产者
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            //如果生产者不为空
            if (impl != null) {
                //更新publishInfo信息
                impl.updateTopicPublishInfo(topic, publishInfo);
            }
        }
    }

    // Update sub info
    {
        //主题订阅信息-消息消费队列
        Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
        //遍历消费者
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            //消费者不为空
            if (impl != null) {
                //更新subscribeInfo信息
                impl.updateTopicSubscribeInfo(topic, subscribeInfo);
            }
        }
    }
    log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
    this.topicRouteTable.put(topic, cloneTopicRouteData);
    return true;
}

MQClientInstance#topicRouteData2TopicPublishInfo

public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
    //创建TopicPublishInfo对象
    TopicPublishInfo info = new TopicPublishInfo();
    //关联TopicRouteData信息
    info.setTopicRouteData(route);
    //顺序消息,更新TopicPublishInfo
    if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
        //获取Broke列表-用分号隔开
        String[] brokers = route.getOrderTopicConf().split(";");
        for (String broker : brokers) {
            String[] item = broker.split(":");
            int nums = Integer.parseInt(item[1]);
            for (int i = 0; i < nums; i++) {
                MessageQueue mq = new MessageQueue(topic, item[0], i);
                info.getMessageQueueList().add(mq);
            }
        }
		//设置为顺序消息
        info.setOrderTopic(true);
    } else {
         //非顺序消息更新TopicPublishInfo
        //获取消息队列信息
        List<QueueData> qds = route.getQueueDatas();
        Collections.sort(qds);
        //遍历topic队列信息
        for (QueueData qd : qds) {
            //权限为可写
            if (PermName.isWriteable(qd.getPerm())) {
                BrokerData brokerData = null;
                //遍历写队列
                for (BrokerData bd : route.getBrokerDatas()) {
                    //根据名称获取写队列对应的broker
                    if (bd.getBrokerName().equals(qd.getBrokerName())) {
                        brokerData = bd;
                        break;
                    }
                }

                if (null == brokerData) {
                    continue;
                }

                if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                    continue;
                }

                //填充TopicPublishInfo消息队列列表
                for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    info.getMessageQueueList().add(mq);
                }
            }
        }

        //顺序消息设置为false
        info.setOrderTopic(false);
    }

    return info;
}
3.3选择队列
//DefaultMQProducerImpl#sendDefaultImpl
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
//DefaultMQProducerImpl#selectOneMessageQueue
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);

MQFaultStrategy#selectOneMessageQueue

//是否启用Broker故障延迟机制
if (this.sendLatencyFaultEnable) {
    try {
        int index = tpInfo.getSendWhichQueue().incrementAndGet();
        for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
            int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
            if (pos < 0) {
                pos = 0;
            }
            MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
            if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                return mq;
            }
        }

        final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
        int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
        if (writeQueueNums > 0) {
            final MessageQueue mq = tpInfo.selectOneMessageQueue();
            if (notBestBroker != null) {
                mq.setBrokerName(notBestBroker);
                mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
            }
            return mq;
        } else {
            latencyFaultTolerance.remove(notBestBroker);
        }
    } catch (Exception e) {
        log.error("Error occurred when selecting message queue", e);
    }

    return tpInfo.selectOneMessageQueue();
}
//不启用Broker故障延迟机制
return tpInfo.selectOneMessageQueue(lastBrokerName);

默认不启用Broker故障延迟机制

TopicPublishInfo#selectOneMessageQueue(final String lastBrokerName)

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //第一次选择消息队列
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            //选择一次-sendWhichQueue自增
            int index = this.sendWhichQueue.incrementAndGet();
            //取模
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0) {
                pos = 0;
            }
            //轮询选择消息队列
            MessageQueue mq = this.messageQueueList.get(pos);
            //规避上次选择的队列
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}

TopicPublishInfo#selectOneMessageQueue()

//第一次选择消息队列
public MessageQueue selectOneMessageQueue() {
    //sendWhichQueue自增
    int index = this.sendWhichQueue.incrementAndGet();
    //对队列大小取模
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0) {
        pos = 0;
    }
    //返回对应的队列
    return this.messageQueueList.get(pos);
}

启用Broker故障延迟机制

DefaultMQProducerImpl#selectOneMessageQueue

//Broker故障延迟机制
if (this.sendLatencyFaultEnable) {
    try {
         //对sendWhichQueue自增
        int index = tpInfo.getSendWhichQueue().incrementAndGet();
        //对消息队列轮询获取一个队列
        for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
            int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
            if (pos < 0) {
                pos = 0;
            }
            MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
            //验证该队列是否可用->可用即返回-不可用继续轮询
            if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                return mq;
            }
        }

        //没有选出较为合适的消息队列->让延迟容错机制至少选出一个Broker出来
        final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
        //写队列个数
        int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
        //写队列个数大于0
        if (writeQueueNums > 0) {
            //选出一个消息队列->指定broker和队列ID并返回
            final MessageQueue mq = tpInfo.selectOneMessageQueue();
            if (notBestBroker != null) {
                mq.setBrokerName(notBestBroker);
                mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
            }
            return mq;
        } else {
            //该Broker也不可用->从容错队列中移除该Broker
            latencyFaultTolerance.remove(notBestBroker);
        }
    } catch (Exception e) {
        log.error("Error occurred when selecting message queue", e);
    }
    return tpInfo.selectOneMessageQueue();
}

  • 延迟机制接口规范
public interface LatencyFaultTolerance<T> {
    //更新失败条目
    void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
	//Broker是否可用
    boolean isAvailable(final T name);
	//移除Fault条目
    void remove(final T name);
	//尝试从规避的Broker中选出一个可用的Broker
    T pickOneAtLeast();
}
  • FaultItem:失败条目
class FaultItem implements Comparable<FaultItem> {
    //条目唯一key->BrokerName
    private final String name;
    //本次发送消息延迟
    private volatile long currentLatency;
    //故障规避开始时间
    private volatile long startTimestamp;
}
  • 消息失败策略
public class MQFaultStrategy {
   //根据currentLatency本地消息发送延迟,从latencyMax尾部向前找到第一个比currentLatency小的索引,如果没有找到,返回0
	private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    //根据这个索引从notAvailableDuration取出对应的时间,在该时长内,Broker设置为不可用
	private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
}

DefaultMQProducerImpl#sendDefaultImpl

//消息发送->发送成功调用回调函数sendCallback
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
//调用失败记录失败时间戳
endTimestamp = System.currentTimeMillis();
//更新调用失败条目
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

MQFaultStrategy#updateFaultItem

if (this.sendLatencyFaultEnable) {
    //计算broker规避的时长
    long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
    //更新该FaultItem规避时长
    this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}

MQFaultStrategy#computeNotAvailableDuration

//遍历latencyMax
for (int i = latencyMax.length - 1; i >= 0; i--) {
    //找到第一个比currentLatency的latencyMax值
    if (currentLatency >= latencyMax[i])
        return this.notAvailableDuration[i];
}
//没有找到则返回0
return 0;

LatencyFaultToleranceImpl#updateFaultItem

//原来的失败条目信息
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
     //失败条目为空->新建faultItem对象设置规避时长和开始时间
    final FaultItem faultItem = new FaultItem(name);
    faultItem.setCurrentLatency(currentLatency);
    faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

    old = this.faultItemTable.putIfAbsent(name, faultItem);
    if (old != null) {
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
} else {
    //不为空->更新规避时长和开始时间
    old.setCurrentLatency(currentLatency);
    old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
3.4发送消息

DefaultMQProducerImpl#sendKernelImpl

private SendResult sendKernelImpl(final Message msg,	//待发送消息
    final MessageQueue mq,							  //消息发送队列 
    final CommunicationMode communicationMode,			//消息发送模式->ASYNC/SYNC/ONEWAY
    final SendCallback sendCallback,				   //异步消息回调函数
    final TopicPublishInfo topicPublishInfo,		    //主题路由信息 
    final long timeout)								  //消息发送超时时间
//根据BrokerName获取Broker地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
//地址为空
if (null == brokerAddr) {
    //更新broker网络地址信息
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
//非批量消息发送->设置消息唯一ID
//批量消息->在消息打包过程中已经生成唯一ID
if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}

boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
    topicWithNamespace = true;
}

int sysFlag = 0;
boolean msgBodyCompressed = false;
//大于4k->进行压缩
if (this.tryToCompressMessage(msg)) {
    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
    msgBodyCompressed = true;
}
//如果是事务消息,设置消息标记MessageSysFlag.TRANSACTION_PREPARED_TYPE
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
//如果注册消息发送钩子函数->消息发送之前进行逻辑增强
if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }

    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
}

SendMessageHook

public interface SendMessageHook {
    String hookName();

    void sendMessageBefore(final SendMessageContext context);

    void sendMessageAfter(final SendMessageContext context);
}
//构建消息发送请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//生产者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());  
//主题
requestHeader.setTopic(msg.getTopic());
//创建默认主题
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
//该主题默认主题队列个数4
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
//队列ID
requestHeader.setQueueId(mq.getQueueId());
//消息系统标识
requestHeader.setSysFlag(sysFlag);
//消息发送时间戳
requestHeader.setBornTimestamp(System.currentTimeMillis());
//消息标记
requestHeader.setFlag(msg.getFlag());
//消息拓展信息
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
//消息重试次数->初始为0
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
//是否是批量消息
requestHeader.setBatch(msg instanceof MessageBatch);
//如果是发送重试消息
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    //对消息重试次数进行更新
    if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }

    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}
SendResult sendResult = null;
switch (communicationMode) {
    //异步发送 
    case ASYNC:
        Message tmpMessage = msg;
        boolean messageCloned = false;
        //如果消息体被压缩
        if (msgBodyCompressed) {
		   //msgBody应该使用prevBody
            tmpMessage = MessageAccessor.cloneMessage(msg);
            messageCloned = true;
            msg.setBody(prevBody);
        }

        if (topicWithNamespace) {
            if (!messageCloned) {
                tmpMessage = MessageAccessor.cloneMessage(msg);
                messageCloned = true;
            }
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }

        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeAsync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        //发送消息并返回结果
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            tmpMessage,
            requestHeader,
            timeout - costTimeAsync,
            communicationMode,
            sendCallback,
            topicPublishInfo,
            this.mQClientFactory,
            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
            context,
            this);
        break;
    case ONEWAY:
    //同步发送 
    case SYNC:
        long costTimeSync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
        break;
    default:
        assert false;
        break;
}
//如果注册了钩子函数->发送完毕后执行钩子函数
if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context);
}
3.5发送批量消息

批量消息发送时序图:

批量消息发送是将一个主题的多条消息进行打包后发送到消息消费端,以此减少网络调用,提高网络传输以及消息发送效率。但是,同一批次的消息数量不是越多越好,如果消息内容过长,则打包消息过程中会导致占用线程资源时间过长,从而导致其他线程发送消息响应时间过长,并且单批次消息总长度不能超过DefaultMQProducer#maxMessageSize -> 4M

DefaultMQProducer#send

public SendResult send(
    Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //调用batch方法将消息进行打包后进行发送
    return this.defaultMQProducerImpl.send(batch(msgs));
}
//继承Message->其实就是用一个List将多个消息存封装起来->上述代码中的batch方法作用就是否则将消息封装成MessageBatch
public class MessageBatch extends Message implements Iterable<Message> {

    private static final long serialVersionUID = 621335151046335557L;
    private final List<Message> messages;
}

DefaultMQProducer#batch

MessageBatch msgBatch;
try {
    //将消息集合封装到MessageBatch.messages
    msgBatch = MessageBatch.generateFromList(msgs);
    //遍历消息
    for (Message message : msgBatch) {
        //对消息一一进行检查
        Validators.checkMessage(message, this);
        //对每个消息设置唯一ID和Topic
        MessageClientIDSetter.setUniqID(message);
        message.setTopic(withNamespace(message.getTopic()));
    }
    //编码后存入Message.body
    msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
    throw new MQClientException("Failed to initiate the MessageBatch", e);
}
//设置msgBatch的主题Topic
msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
return msgBatch;

将消息封装成MessageBatch之后的消息发送步骤跟单条消息的发送步骤完全一致,至此消息发送已经完成。

以上。

本文仅作为个人学习使用,水平有限,如有错误请指正!

相关文章