RocketMQ进击(六)磕一磕RocketMQ的事务消息和事务性消息的生产与消费

x33g5p2x  于2021-12-19 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(406)

楔子:有句老话在电视上大概已经听得生茧:我们不成功便成仁。最终是要完成任务。

1. 不成功便成仁

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。通过事务消息达到分布式事务的最终一致。

Apache RocketMQ 在 4.3.0 版中已经支持分布式事务消息,它采用了 2PC 的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。如下图所示:

上图说明了事务消息的大致方案,其中可以分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

一】事务消息发送及提交:

1)Producer 向 Broker 发送消息(half 消息)
2)服务端响应消息写入结果
3)Producer 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行)
4)Producer 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见)

二】补偿流程:

5)对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”
6)Producer 收到回查消息,检查回查消息对应的本地事务的状态
7)根据本地事务状态,重新 Commit 或者 Rollback

其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者异常失败的情况。

2. 闲时二三闲事

2.1. 源码定义

2.1.1 事务的三种状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown:中间状态,它代表需要检查消息队列来确定状态。
package org.apache.rocketmq.client.producer;

public enum LocalTransactionState {
    COMMIT_MESSAGE,
    ROLLBACK_MESSAGE,
    UNKNOW;

    private LocalTransactionState() {
    }
}

2.2. 代码示例

这里通过一个常用场景简单模拟 RocketMQ 的事务消息:写2个微服务,分别是订单服务和商品服务。订单服务进行下单处理,并发送消息给商品服务,商品服务对下单成功的商品进行扣减库存。

2.2.1 创建事务性生产者

首先,写一个简易订单服务,使用 TransactionMQProducer 类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行状态回传。回传的三种事务状态如上所述。

package com.meiwei.service.mq.tcp.producer;

import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 订单服务
 */
public class OrderService {

    // Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
    // Topic 名称长度不得超过 64 字符长度限制,否则会导致无法发送或者订阅
    private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
    // Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
    private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_ORDER_TRANSACTION";

    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer();

        // 指定 NameServer 地址列表,多个nameServer地址用半角分号隔开。此处应改为实际 NameServer 地址
        // NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setProducerGroup("meiwei-producer-transaction-mq");

        // 自定义线程池,执行事务操作
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("meiwei-order-service-transaction-msg-check");
                return thread;
            }
        });
        producer.setExecutorService(executor);

        // 设置事务消息监听器
        producer.setTransactionListener(new OrderTransactionListener());

        // 在发送MQ消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
        producer.start();
        System.out.println("Order Server Start.");

        // 模拟业务
        for (int i = 0; i < 5; i++) {
            String orderId = System.currentTimeMillis() + "";
            String payOrder = "下单完成,订单编号:" + orderId;
            Message message = new Message(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH, orderId, payOrder.getBytes(RemotingHelper.DEFAULT_CHARSET));

            // 发送事务消息
            TransactionSendResult result = producer.sendMessageInTransaction(message, orderId);
            System.out.println("【发送事务消息】发送结果:" + result);

            Thread.sleep(100);
        }
    }
}

2.2.2 实现事务的监听接口

事务消息需要注册一个 TransactionListener,进行本地事务的执行和事务回查,代码如下:

package com.meiwei.service.mq.tcp.producer;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 订单事务消息监听器
 */
public class OrderTransactionListener implements TransactionListener {

    private static final Map<String, Integer> statusMap = new ConcurrentHashMap<>();

    // 执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        String orderId = (String) o;

        // 记录本地事务执行结果
        Integer status = this.executeTransactionResult(orderId);
        System.out.println("【订单事务消息监听器】本地有新订单,orderId: " + orderId + ", result: " + status);

        // 返回中间状态,需要检查消息队列来确定状态,即触发 checkLocalTransaction
        return LocalTransactionState.UNKNOW;
    }

    // 检查本地事务状态,并回应消息队列的检查请求
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        Integer status = statusMap.get(messageExt.getKeys());
        System.out.println("【订单事务消息监听器】执行事务消息回查,orderId: " + messageExt.getKeys() + ", result: " + status + ",时间: " + new Date());

        if (null != status) {
            switch (status) {
                case 0:
                    // 中间状态,它代表需要检查消息队列来确定状态
                    return LocalTransactionState.UNKNOW;
                case 1:
                    // 提交事务,它允许消费者消费此消息
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    // 回滚事务,它代表该消息将被删除,不允许被消费
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    // 模拟一个业务场景,并返回订单处理状态 
    private Integer executeTransactionResult(String orderId) {
        Integer status = Math.toIntExact(Long.valueOf(orderId) % 3);
        statusMap.put(orderId, status);
        return status;
    }
}

2.2.3 创建事务性消费者

再次,写一个简易的商品服务,接收订单服务的事务消息,如果消息成功 commit,则进行本地扣减库存。

package com.meiwei.service.mq.tcp.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

/**
 * 商品服务
 */
public class DrugsService {

    // Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
    // Topic 名称长度不得超过 64 字符长度限制,否则会导致无法发送或者订阅
    private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
    // Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
    private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_ORDER_TRANSACTION";

    public static void main(String[] args) throws Exception {
        // 声明并初始化一个 consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        // 指定 NameServer 地址列表,多个nameServer地址用半角分号隔开。此处应改为实际 NameServer 地址
        // NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumerGroup("meiwei-consumer-transaction-mq");
        // 设置 consumer 所订阅的 Topic 和 Tag,这里的 Topic 需要与生产者保持一致
        consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH);
        // 注册一个消息监听器消费消息
        consumer.registerMessageListener(new DrugsMqConsumerListener());

        consumer.start();
        System.out.println("Drugs Service Start.");
    }
}

2.2.4 实现消息消费监听器

package com.meiwei.service.mq.tcp.consumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

/**
 * 商品消费端监听器
 */
public class DrugsMqConsumerListener implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        Optional.ofNullable(list).orElse(Collections.emptyList()).forEach(msg -> {
            String orderId = msg.getKeys();
            System.out.println("【商品消费端监听器】您有新订单,orderId: " + orderId + ",商品库存需要更新");
        });

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

2.3. 测试与结果

订单服务作为生产者发出新订单扣减库存消息:

Order Server Start.
【订单事务消息监听器】本地有新订单,orderId: 1571021207194, result: 1
【发送事务消息】发送结果:SendResult [sendStatus=SEND_OK, msgId=0A06341B156C18B4AAC24542D3BF0000, offsetMsgId=null, messageQueue=MessageQueue [topic=TOPIC_MEIWEI_SMS_NOTICE_TEST, brokerName=YYW-SH-PC-1454, queueId=1], queueOffset=1620]
【订单事务消息监听器】本地有新订单,orderId: 1571021207595, result: 0
【发送事务消息】发送结果:SendResult [sendStatus=SEND_OK, msgId=0A06341B156C18B4AAC24542D42B0001, offsetMsgId=null, messageQueue=MessageQueue [topic=TOPIC_MEIWEI_SMS_NOTICE_TEST, brokerName=YYW-SH-PC-1454, queueId=2], queueOffset=1621]
【订单事务消息监听器】本地有新订单,orderId: 1571021207696, result: 2
【发送事务消息】发送结果:SendResult [sendStatus=SEND_OK, msgId=0A06341B156C18B4AAC24542D4900002, offsetMsgId=null, messageQueue=MessageQueue [topic=TOPIC_MEIWEI_SMS_NOTICE_TEST, brokerName=YYW-SH-PC-1454, queueId=3], queueOffset=1622]
【订单事务消息监听器】本地有新订单,orderId: 1571021207798, result: 2
【发送事务消息】发送结果:SendResult [sendStatus=SEND_OK, msgId=0A06341B156C18B4AAC24542D4F60003, offsetMsgId=null, messageQueue=MessageQueue [topic=TOPIC_MEIWEI_SMS_NOTICE_TEST, brokerName=YYW-SH-PC-1454, queueId=0], queueOffset=1623]
【订单事务消息监听器】本地有新订单,orderId: 1571021207900, result: 2
【发送事务消息】发送结果:SendResult [sendStatus=SEND_OK, msgId=0A06341B156C18B4AAC24542D55C0004, offsetMsgId=null, messageQueue=MessageQueue [topic=TOPIC_MEIWEI_SMS_NOTICE_TEST, brokerName=YYW-SH-PC-1454, queueId=1], queueOffset=1624]
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207194, result: 1,时间: Mon Oct 14 10:46:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:46:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207798, result: 2,时间: Mon Oct 14 10:46:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207696, result: 2,时间: Mon Oct 14 10:46:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207900, result: 2,时间: Mon Oct 14 10:46:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:47:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:48:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:49:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:50:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:51:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:52:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:53:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:54:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:55:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:56:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:57:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:58:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:59:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 11:00:56 CST 2019

商品服务作为消费者监听并消费到扣减库存的消息,后面就可以进行扣减库存的业务处理:

从上面的生产者输出日志可以看到订单服务发出的 5 条消息:

  • 订单状态为 UNKNOW:0 的有 1 条,这条消息被回查了 15 次(transactionCheckMax 默认值 15,可配置)
  • 订单状态为 COMMIT_MESSAGE:1 的有 1 条,且被商品服务正常消费
  • 订单状态为 ROLLBACK_MESSAGE:2 的有 3 条,且未被商品服务消费,也未被回查

2.4. 使用限制

  1. 事务消息不支持延时消息和批量消息。
  2. 事务性消息可能不止一次被检查或消费。
  3. 为了避免单个消息被检查太多次而导致半队列消息累积,源码默认将单个消息的检查次数限制为 15 次,但是我们可以通过 Broker 配置文件的 transactionCheckMax 参数来修改这个限制。如果已经检查某条消息超过 N 次的话(N =  transactionCheckMax),Broker 则将丢弃此消息,并在默认情况下同时打印错误日志。不过可以通过重写  AbstractTransactionCheckListener 类来修改这个行为。
  4. 事务消息 将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后 被检查。不过可以通过设置属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

相关文章