TCC-Transaction 源码分析 —— 事务存储器

x33g5p2x  于2021-12-21 转载在 其他  
字(11.0k)|赞(0)|评价(0)|浏览(388)

1. 概述

本文分享 事务存储器。主要涉及如下 Maven 项目:

  • tcc-transaction-core :tcc-transaction 底层实现。

在 TCC 的过程中,根据应用内存中的事务信息完成整个事务流程。But 实际业务场景中,将事务信息只放在应用内存中是远远不够可靠的。例如:

  1. 应用进程异常崩溃,未完成的事务信息将丢失。
  2. 应用进程集群,当提供远程服务调用时,事务信息需要集群内共享。
  3. 发起事务的应用需要重启部署新版本,因为各种原因,有未完成的事务。

因此,TCC-Transaction 将事务信息添加到内存中的同时,会使用外部存储进行持久化。目前提供四种外部存储:

  • JdbcTransactionRepository,JDBC 事务存储器
  • RedisTransactionRepository,Redis 事务存储器
  • ZooKeeperTransactionRepository,Zookeeper 事务存储器
  • FileSystemTransactionRepository,File 事务存储器

本文涉及到的类关系如下图( 打开大图 ):

你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 TCC-Transaction 点赞!传送门

ps:笔者假设你已经阅读过《tcc-transaction 官方文档 —— 使用指南1.2.x》

2. 序列化

在[《TCC-Transaction 源码分析 —— TCC 实现》「4. 事务与参与者」],可以看到 Transaction 是一个比较复杂的对象,内嵌 Participant 数组,而 Participant 本身也是复杂的对象,内嵌了更多的其他对象,因此,存储器在持久化 Transaction 时,需要序列化后才能存储。

org.mengyun.tcctransaction.serializer.ObjectSerializer,对象序列化接口。实现代码如下:

  1. public interface ObjectSerializer<T> {
  2. byte[] serialize(T t);
  3. T deserialize(byte[] bytes);
  4. }

2.1 JDK 序列化实现

org.mengyun.tcctransaction.serializer.JdkSerializationSerializer,JDK 序列化实现。比较易懂,点击链接直接查看。

TCC-Transaction 使用的默认的序列化

2.2 Kyro 序列化实现

org.mengyun.tcctransaction.serializer.KryoTransactionSerializer,Kyro 序列化实现。比较易懂,点击链接直接查看。

2.3 JSON 序列化实现

JDK 和 Kyro 的序列化实现,肉眼无法直观具体存储事务的信息,你可以通过实现 ObjectSerializer 接口,实现自定义的 JSON 序列化。

3. 存储器

org.mengyun.tcctransaction.TransactionRepository,事务存储器接口。实现代码如下:

  1. public interface TransactionRepository {
  2. /**
  3. * 新增事务
  4. *
  5. * @param transaction 事务
  6. * @return 新增数量
  7. */
  8. int create(Transaction transaction);
  9. /**
  10. * 更新事务
  11. *
  12. * @param transaction 事务
  13. * @return 更新数量
  14. */
  15. int update(Transaction transaction);
  16. /**
  17. * 删除事务
  18. *
  19. * @param transaction 事务
  20. * @return 删除数量
  21. */
  22. int delete(Transaction transaction);
  23. /**
  24. * 获取事务
  25. *
  26. * @param xid 事务编号
  27. * @return 事务
  28. */
  29. Transaction findByXid(TransactionXid xid);
  30. /**
  31. * 获取超过指定时间的事务集合
  32. *
  33. * @param date 指定时间
  34. * @return 事务集合
  35. */
  36. List<Transaction> findAllUnmodifiedSince(Date date);
  37. }

3.1 可缓存的事务存储器抽象类

org.mengyun.tcctransaction.repository.CachableTransactionRepository可缓存的事务存储器抽象类,实现增删改查事务时,同时缓存事务信息。在上面类图,我们也可以看到 TCC-Transaction 自带的多种存储器都继承该抽象类。

CachableTransactionRepository 构造方法实现代码如下:

  1. public abstract class CachableTransactionRepository implements TransactionRepository {
  2. /**
  3. * 缓存过期时间
  4. */
  5. private int expireDuration = 120;
  6. /**
  7. * 缓存
  8. */
  9. private Cache<Xid, Transaction> transactionXidCompensableTransactionCache;
  10. public CachableTransactionRepository() {
  11. transactionXidCompensableTransactionCache = CacheBuilder.newBuilder().expireAfterAccess(expireDuration, TimeUnit.SECONDS).maximumSize(1000).build();
  12. }
  13. }
  • 使用 Guava Cache 内存缓存事务信息,设置最大缓存个数为 1000 个,缓存过期时间为最后访问时间 120 秒。

#create(...) 实现代码如下:

  1. @Override
  2. public int create(Transaction transaction) {
  3. int result = doCreate(transaction);
  4. if (result > 0) {
  5. putToCache(transaction);
  6. }
  7. return result;
  8. }
  9. /**
  10. * 添加到缓存
  11. *
  12. * @param transaction 事务
  13. */
  14. protected void putToCache(Transaction transaction) {
  15. transactionXidCompensableTransactionCache.put(transaction.getXid(), transaction);
  16. }
  17. /**
  18. * 新增事务
  19. *
  20. * @param transaction 事务
  21. * @return 新增数量
  22. */
  23. protected abstract int doCreate(Transaction transaction);
  • 调用 #doCreate(...) 方法,新增事务。新增成功后,调用 #putToCache(...) 方法,添加事务到缓存。
  • #doCreate(...) 为抽象方法,子类实现该方法,提供新增事务功能。

#update(...) 实现代码如下:

  1. @Override
  2. public int update(Transaction transaction) {
  3. int result = 0;
  4. try {
  5. result = doUpdate(transaction);
  6. if (result > 0) {
  7. putToCache(transaction);
  8. } else {
  9. throw new OptimisticLockException();
  10. }
  11. } finally {
  12. if (result <= 0) { // 更新失败,移除缓存。下次访问,从存储器读取
  13. removeFromCache(transaction);
  14. }
  15. }
  16. return result;
  17. }
  18. /**
  19. * 移除事务从缓存
  20. *
  21. * @param transaction 事务
  22. */
  23. protected void removeFromCache(Transaction transaction) {
  24. transactionXidCompensableTransactionCache.invalidate(transaction.getXid());
  25. }
  26. /**
  27. * 更新事务
  28. *
  29. * @param transaction 事务
  30. * @return 更新数量
  31. */
  32. protected abstract int doUpdate(Transaction transaction);
  • 调用 #doUpdate(...) 方法,更新事务。

  • 若更新成功后,调用 #putToCache(...) 方法,添加事务到缓存。

  • 若更新失败后,抛出 OptimisticLockException 异常。有两种情况会导致更新失败:(1) 该事务已经被提交,被删除;(2) 乐观锁更新时,缓存的事务的版本号( Transaction.version )和存储器里的事务的版本号不同,更新失败。为什么?在《TCC-Transaction 源码分析 —— 事务恢复》详细解析。更新失败,意味着缓存已经不不一致,调用 #removeFromCache(...) 方法,移除事务从缓存中。

  • #doUpdate(...) 为抽象方法,子类实现该方法,提供更新事务功能。

#delete(...) 实现代码如下:

  1. @Override
  2. public int delete(Transaction transaction) {
  3. int result;
  4. try {
  5. result = doDelete(transaction);
  6. } finally {
  7. removeFromCache(transaction);
  8. }
  9. return result;
  10. }
  11. /**
  12. * 删除事务
  13. *
  14. * @param transaction 事务
  15. * @return 删除数量
  16. */
  17. protected abstract int doDelete(Transaction transaction);
  • 调用 #doDelete(...) 方法,删除事务。
  • 调用 #removeFromCache(...) 方法,移除事务从缓存中。
  • #doDelete(...) 为抽象方法,子类实现该方法,提供删除事务功能。

#findByXid(...) 实现代码如下:

  1. @Override
  2. public Transaction findByXid(TransactionXid transactionXid) {
  3. Transaction transaction = findFromCache(transactionXid);
  4. if (transaction == null) {
  5. transaction = doFindOne(transactionXid);
  6. if (transaction != null) {
  7. putToCache(transaction);
  8. }
  9. }
  10. return transaction;
  11. }
  12. /**
  13. * 获得事务从缓存中
  14. *
  15. * @param transactionXid 事务编号
  16. * @return 事务
  17. */
  18. protected Transaction findFromCache(TransactionXid transactionXid) {
  19. return transactionXidCompensableTransactionCache.getIfPresent(transactionXid);
  20. }
  21. /**
  22. * 查询事务
  23. *
  24. * @param xid 事务编号
  25. * @return 事务
  26. */
  27. protected abstract Transaction doFindOne(Xid xid);
  • 调用 #findFromCache() 方法,优先从缓存中获取事务。
  • 调用 #doFindOne() 方法,缓存中事务不存在,从存储器中获取。获取到后,调用 #putToCache() 方法,添加事务到缓存中。
  • #doFindOne(...) 为抽象方法,子类实现该方法,提供查询事务功能。

#findAllUnmodifiedSince(...) 实现代码如下:

  1. @Override
  2. public List<Transaction> findAllUnmodifiedSince(Date date) {
  3. List<Transaction> transactions = doFindAllUnmodifiedSince(date);
  4. // 添加到缓存
  5. for (Transaction transaction : transactions) {
  6. putToCache(transaction);
  7. }
  8. return transactions;
  9. }
  10. /**
  11. * 获取超过指定时间的事务集合
  12. *
  13. * @param date 指定时间
  14. * @return 事务集合
  15. */
  16. protected abstract List<Transaction> doFindAllUnmodifiedSince(Date date);
  • 调用 #findAllUnmodifiedSince(...) 方法,从存储器获取超过指定时间的事务集合。调用 #putToCache(...) 方法,循环事务集合添加到缓存。
  • #doFindAllUnmodifiedSince(...) 为抽象方法,子类实现该方法,提供获取超过指定时间的事务集合功能。

3.2 JDBC 事务存储器

org.mengyun.tcctransaction.repository.JdbcTransactionRepository,JDBC 事务存储器,通过 JDBC 驱动,将 Transaction 存储到 MySQL / Oracle / PostgreSQL / SQLServer 等关系数据库。实现代码如下:

  1. public class JdbcTransactionRepository extends CachableTransactionRepository {
  2. /**
  3. * 领域
  4. */
  5. private String domain;
  6. /**
  7. * 表后缀
  8. */
  9. private String tbSuffix;
  10. /**
  11. * 数据源
  12. */
  13. private DataSource dataSource;
  14. /**
  15. * 序列化
  16. */
  17. private ObjectSerializer serializer = new JdkSerializationSerializer();
  18. }
  • domain,领域,或者也可以称为模块名,应用名,用于唯一标识一个资源。例如,Maven 模块 xxx-order,我们可以配置该属性为 ORDER
  • tbSuffix,表后缀。默认存储表名为 TCC_TRANSACTION,配置表名后,为 TCC_TRANSACTION${tbSuffix}
  • dataSource,存储数据的数据源。
  • serializer,序列化。**当数据库里已经有数据的情况下,不要更换别的序列化,否则会导致反序列化报错。**建议:TCC-Transaction 存储时,新增字段,记录序列化的方式。

表结构如下:

  1. CREATE TABLE `TCC_TRANSACTION` (
  2. `TRANSACTION_ID` int(11) NOT NULL AUTO_INCREMENT,
  3. `DOMAIN` varchar(100) DEFAULT NULL,
  4. `GLOBAL_TX_ID` varbinary(32) NOT NULL,
  5. `BRANCH_QUALIFIER` varbinary(32) NOT NULL,
  6. `CONTENT` varbinary(8000) DEFAULT NULL,
  7. `STATUS` int(11) DEFAULT NULL,
  8. `TRANSACTION_TYPE` int(11) DEFAULT NULL,
  9. `RETRIED_COUNT` int(11) DEFAULT NULL,
  10. `CREATE_TIME` datetime DEFAULT NULL,
  11. `LAST_UPDATE_TIME` datetime DEFAULT NULL,
  12. `VERSION` int(11) DEFAULT NULL,
  13. PRIMARY KEY (`TRANSACTION_ID`),
  14. UNIQUE KEY `UX_TX_BQ` (`GLOBAL_TX_ID`,`BRANCH_QUALIFIER`)
  15. ) ENGINE=InnoDB DEFAULT CHARSET=utf8
  • TRANSACTION_ID,仅仅数据库自增,无实际用途。
  • CONTENT,Transaction 序列化。

ps:点击链接查看 JdbcTransactionRepository 代码实现,已经添加完整中文注释。

3.3 Redis 事务存储器

org.mengyun.tcctransaction.repository.RedisTransactionRepository,Redis 事务存储器,将 Transaction 存储到 Redis。实现代码如下:

  1. public class RedisTransactionRepository extends CachableTransactionRepository {
  2. /**
  3. * Jedis Pool
  4. */
  5. private JedisPool jedisPool;
  6. /**
  7. * key 前缀
  8. */
  9. private String keyPrefix = "TCC:";
  10. /**
  11. * 序列化
  12. */
  13. private ObjectSerializer serializer = new JdkSerializationSerializer();
  14. }
  • keyPrefix,key 前缀。类似 JdbcTransactionRepository 的 domain 属性。

一个事务存储到 Reids,使用 Redis 的数据结构为 HASHES。

  • key : 使用 keyPrefix + xid,实现代码如下:
  1. /**
  2. * 创建事务的 Redis Key
  3. *
  4. * @param keyPrefix key 前缀
  5. * @param xid 事务
  6. * @return Redis Key
  7. */
  8. public static byte[] getRedisKey(String keyPrefix, Xid xid) {
  9. byte[] prefix = keyPrefix.getBytes();
  10. byte[] globalTransactionId = xid.getGlobalTransactionId();
  11. byte[] branchQualifier = xid.getBranchQualifier();
  12. // 拼接 key
  13. byte[] key = new byte[prefix.length + globalTransactionId.length + branchQualifier.length];
  14. System.arraycopy(prefix, 0, key, 0, prefix.length);
  15. System.arraycopy(globalTransactionId, 0, key, prefix.length, globalTransactionId.length);
  16. System.arraycopy(branchQualifier, 0, key, prefix.length + globalTransactionId.length, branchQualifier.length);
  17. return key;
  18. }
  • HASHES 的 key :使用 version

  • 添加和更新 Transaction 时,使用 Redis HSETNX,不存在当前版本的值时,进行设置,重而实现类似乐观锁的更新。

  • 读取 Transaction 时,使用 Redis HGETALL,将 Transaction 所有 version 对应的值读取到内存后,取 version 值最大的对应的值。

  • HASHES 的 value :调用 TransactionSerializer#serialize(...) 方法,序列化 Transaction。实现代码如下:

  1. public static byte[] serialize(ObjectSerializer serializer, Transaction transaction) {
  2. Map<String, Object> map = new HashMap<String, Object>();
  3. map.put("GLOBAL_TX_ID", transaction.getXid().getGlobalTransactionId());
  4. map.put("BRANCH_QUALIFIER", transaction.getXid().getBranchQualifier());
  5. map.put("STATUS", transaction.getStatus().getId());
  6. map.put("TRANSACTION_TYPE", transaction.getTransactionType().getId());
  7. map.put("RETRIED_COUNT", transaction.getRetriedCount());
  8. map.put("CREATE_TIME", transaction.getCreateTime());
  9. map.put("LAST_UPDATE_TIME", transaction.getLastUpdateTime());
  10. map.put("VERSION", transaction.getVersion());
  11. // 序列化
  12. map.put("CONTENT", serializer.serialize(transaction));
  13. return serializer.serialize(map);
  14. }

在实现 #doFindAllUnmodifiedSince(date) 方法,无法像数据库使用时间条件进行过滤,因此,加载所有事务后在内存中过滤。实现代码如下: 

  1. @Override
  2. protected List<Transaction> doFindAllUnmodifiedSince(Date date) {
  3. // 获得所有事务
  4. List<Transaction> allTransactions = doFindAll();
  5. // 过滤时间
  6. List<Transaction> allUnmodifiedSince = new ArrayList<Transaction>();
  7. for (Transaction transaction : allTransactions) {
  8. if (transaction.getLastUpdateTime().compareTo(date) < 0) {
  9. allUnmodifiedSince.add(transaction);
  10. }
  11. }
  12. return allUnmodifiedSince;
  13. }

点击链接查看 RedisTransactionRepository 代码实现,已经添加完整中文注释。

FROM 《TCC-Transaction 官方文档 —— 使用指南1.2.x》
使用 RedisTransactionRepository 需要配置 Redis 服务器如下:
appendonly yes
appendfsync always

3.4 Zookeeper 事务存储器

org.mengyun.tcctransaction.repository.ZooKeeperTransactionRepository,Zookeeper 事务存储器,将 Transaction 存储到 Zookeeper。实现代码如下:

  1. public class ZooKeeperTransactionRepository extends CachableTransactionRepository {
  2. /**
  3. * Zookeeper 服务器地址数组
  4. */
  5. private String zkServers;
  6. /**
  7. * Zookeeper 超时时间
  8. */
  9. private int zkTimeout;
  10. /**
  11. * TCC 存储 Zookeeper 根目录
  12. */
  13. private String zkRootPath = "/tcc";
  14. /**
  15. * Zookeeper 连接
  16. */
  17. private volatile ZooKeeper zk;
  18. /**
  19. * 序列化
  20. */
  21. private ObjectSerializer serializer = new JdkSerializationSerializer();
  22. }
  • zkRootPath,存储 Zookeeper 根目录,类似 JdbcTransactionRepository 的 domain 属性。

一个事务存储到 Zookeeper,使用 Zookeeper 的持久数据节点

  • path:${zkRootPath} + / + ${xid}。实现代码如下:
  1. // ZooKeeperTransactionRepository.java
  2. private String getTxidPath(Xid xid) {
  3. return String.format("%s/%s", zkRootPath, xid);
  4. }
  5. // TransactionXid.java
  6. @Override
  7. public String toString() {
  8. StringBuilder stringBuilder = new StringBuilder();
  9. stringBuilder.append("globalTransactionId:").append(UUID.nameUUIDFromBytes(globalTransactionId).toString());
  10. stringBuilder.append(",").append("branchQualifier:").append(UUID.nameUUIDFromBytes(branchQualifier).toString());
  11. return stringBuilder.toString();
  12. }
  • data:调用 TransactionSerializer#serialize(...) 方法,序列化 Transaction。
  • version:使用 Zookeeper 数据节点自带版本功能。这里要注意下,Transaction 的版本从 1 开始,而 Zookeeper 数据节点版本从 0 开始。

ps:点击链接查看 ZooKeeperTransactionRepository 代码实现,已经添加完整中文注释。

另外,在生产上暂时不建议使用 ZooKeeperTransactionRepository,原因有两点:

  • 不支持 Zookeeper 安全认证。
  • 使用 Zookeeper 时,未考虑断网重连等情况。

如果你要使用 Zookeeper 进行事务的存储,可以考虑使用 Apache Curator 操作 Zookeeper,重写 ZooKeeperTransactionRepository 部分代码。

3.5 File 事务存储器

org.mengyun.tcctransaction.repository.FileSystemTransactionRepository,File 事务存储器,将 Transaction 存储到文件系统。

实现上和 ZooKeeperTransactionRepository,区别主要在于不支持乐观锁更新。有兴趣的同学点击链接查看,这里就不拓展开来。

另外,在生产上不建议使用 FileSystemTransactionRepository,因为不支持多节点共享。用分布式存储挂载文件另说,当然还是不建议,因为不支持乐观锁并发更新。

相关文章