Seata解析-锁管理器LockManager详解

x33g5p2x  于2021-12-21 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(687)

本文基于seata 1.3.0版本

之前在介绍分支事务注册的时候需要对记录主键加锁,这里加锁使用便是LockManager的实现类FileLockManager。本文将详细介绍seata的锁管理器。

一、LockManager

锁管理器的顶级接口是LockManager,锁管理器必须要实现该接口。

  1. public interface LockManager {
  2. /** * 申请锁 */
  3. boolean acquireLock(BranchSession branchSession) throws TransactionException;
  4. /** * 释放锁 */
  5. boolean releaseLock(BranchSession branchSession) throws TransactionException;
  6. /** * 释放全局事务中每个分支事务的锁 */
  7. boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException;
  8. /** * 查看是否对lockKey加了锁 */
  9. boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException;
  10. /** * 清理所有的锁,该方法还没有调用点 */
  11. void cleanAllLocks() throws TransactionException;
  12. }

LockManager的方法相对比较简单,看方法名就可以知道其作用了。

二、AbstractLockManager

seata提供了LockManager的抽象实现类AbstractLockManager,如果我们自定义一个锁管理器,可以通过对该抽象类扩展。
除了释放锁和锁查询之外,AbstractLockManager的其他关键方法在文章《Seata解析-TC处理全局事务和分支事务原理详解之全局事务开启和分支事务注册》已经做过了介绍,本文不再介绍。下面看一下释放锁和锁查询这两个方法:

  1. public boolean releaseLock(BranchSession branchSession) throws TransactionException {
  2. if (branchSession == null) {
  3. throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
  4. }
  5. List<RowLock> locks = collectRowLocks(branchSession);
  6. try {
  7. return getLocker(branchSession).releaseLock(locks);
  8. } catch (Exception t) {
  9. LOGGER.error("unLock error, branchSession:{}", branchSession, t);
  10. return false;
  11. }
  12. }
  13. @Override
  14. public boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException {
  15. if (StringUtils.isBlank(lockKey)) {
  16. // no lock
  17. return true;
  18. }
  19. List<RowLock> locks = collectRowLocks(lockKey, resourceId, xid);
  20. try {
  21. return getLocker().isLockable(locks);
  22. } catch (Exception t) {
  23. LOGGER.error("isLockable error, xid:{} resourceId:{}, lockKey:{}", xid, resourceId, lockKey, t);
  24. return false;
  25. }
  26. }

这两个方法都是先调用collectRowLocks方法解析lockKey,lockKey的规则在之前的文章里面也已经介绍过。解析后调用getLocker(),由该方法返回的对象进行后续处理。
这个getLocker()方法是本类的抽象方法,由子类去实现。目前AbstractLockManager有三个子类,分别是RedisLockManager、DataBaseLockManager、FileLockManager,其中FileLockManager最简单。

三、FileLockManager

FileLockManager继承自AbstractLockManager。
FileLockManager非常简单,只有两个方法:

  1. public Locker getLocker(BranchSession branchSession) {
  2. return new FileLocker(branchSession);
  3. }
  4. @Override
  5. public boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException {
  6. //遍历全局事务下的每个分支事务,找到分支事务加的锁,然后释放
  7. //释放锁调用的也是FileLocker的方法
  8. ArrayList<BranchSession> branchSessions = globalSession.getBranchSessions();
  9. boolean releaseLockResult = true;
  10. for (BranchSession branchSession : branchSessions) {
  11. //调用父类的releaseLock
  12. if (!this.releaseLock(branchSession)) {
  13. releaseLockResult = false;
  14. }
  15. }
  16. return releaseLockResult;
  17. }

FileLockManager实现了父类的getLocker(),通过之前的代码介绍可以看到,几乎所有与锁相关的方法最后都需要调用getLocker()。
在FileLockManager中,getLocker返回对象FileLocker。下面介绍一下FileLocker类。

1、FileLocker

下面是FileLocker的构造方法,从构造方法可以看出,该类只处理BranchSession,换句话说该类只用于对数据库记录的主键加锁:

  1. public FileLocker(BranchSession branchSession) {
  2. this.branchSession = branchSession;
  3. }

该类的一些方法之前也介绍过,这里只介绍它的一个属性LOCK_MAP:

  1. private static final ConcurrentMap<String/* resourceId */, ConcurrentMap<String/* tableName */, ConcurrentMap<Integer/* bucketId */, BucketLockMap>>>
  2. LOCK_MAP = new ConcurrentHashMap<>();

LOCK_MAP是static的,该属性是一个全局属性。LOCK_MAP的作用是记录TC端所有的数据库记录锁,其存储的数据映射结构是:

数据库资源->表名->桶->BucketLockMap

数据库资源指的是数据库,对记录加锁时,seata会计算记录主键的哈希值,然后将其分散到128个桶中,BucketLockMap可以简单的理解为Map对象,是记录主键值与transactionId的对应关系,如果某个记录属于1号桶,那么就将该记录的主键值和加锁的transactionId添加到1号桶的BucketLockMap对象中。
每个分支事务注册的时候如果对记录加锁,那么就要在LOCK_MAP中增加一条锁记录。客户端查询某个记录是否加锁时,也是查看LOCK_MAP。
因为LOCK_MAP存放在内存中,所以FileLocker提供的其实是内存类型的锁。

四、DataBaseLockManager

DataBaseLockManager是将锁信息存储到了数据库中,因此需要首先在数据库中建一张记录锁的表,该表包含的字段如下:

  1. xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified

其他字段看名字就可以知道其含义,这里只介绍后面几个字段的含义:

  • pk:主键值
  • row_key:resource_id+table_name+pk的组合
  • gmt_create:当前记录的创建时间
  • gmt_modified:当前记录的最后修改时间

具体的建表语句可以在script模块的server/db目录下找到。
表名默认是lock_table,可以通过store.db.lockTable设置其他表名。
下面来看一下DataBaseLockManager的初始化方法:

  1. //init方法在SPI创建对象后初始化时调用的,SPI会自动初始化实现Initialize接口的类
  2. public void init() {
  3. // init dataSource
  4. //datasourceType表示访问数据库,使用哪种连接池,seata提供了三种:dbcp,druid,hihari
  5. String datasourceType = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);
  6. //DataSourceProvider主要作用是创建出指定类型的DataSource对象
  7. DataSource lockStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();
  8. //创建数据库锁对象
  9. locker = new DataBaseLocker(lockStoreDataSource);
  10. }

从上面的代码可以看出,DataBaseLockManager对记录加锁使用的是DataBaseLocker对象。
下面是DataBaseLocker的构造方法:

  1. public DataBaseLocker(DataSource logStoreDataSource) {
  2. lockStore = new LockStoreDataBaseDAO(logStoreDataSource);
  3. }

在构造方法中又创建了LockStoreDataBaseDAO,其实这个LockStoreDataBaseDAO对象类似于FileLocker的LOCK_MAP的作用,DataBaseLocker存取锁到数据库都是通过LockStoreDataBaseDAO完成的,所以下面重点分析LockStoreDataBaseDAO。

  1. public LockStoreDataBaseDAO(DataSource lockStoreDataSource) {
  2. //数据源
  3. this.lockStoreDataSource = lockStoreDataSource;
  4. //存储锁的表名,默认是lock_table
  5. //可以通过store.db.lockTable设置表名
  6. lockTable = CONFIG.getConfig(ConfigurationKeys.LOCK_DB_TABLE, DEFAULT_LOCK_DB_TABLE);
  7. //dbType表示使用的是什么数据库,可以mysql,oracle等
  8. //通过store.db.dbType设置
  9. dbType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_TYPE);
  10. if (StringUtils.isBlank(dbType)) {
  11. throw new StoreException("there must be db type.");
  12. }
  13. if (lockStoreDataSource == null) {
  14. throw new StoreException("there must be lockStoreDataSource.");
  15. }
  16. }

本文只分析一下加锁的流程。
加锁是通过acquireLock方法完成的。
首先将加锁信息的RowLock对象转化为LockDO对象,LockDO对象的属性与LOCK_TABLE的字段一一对应,转换之后根据LockDO对象的rowKey查询表LOCK_TABLE是否已经有加锁数据,如果没有加锁或者加锁的事务是当前事务,那么认为可以加锁,接下来将LockDO对象插入表LOCK_TABLE中,否则认为加锁失败,将刚才加过的锁解锁,然后返回加锁失败。下面看一下代码:

  1. //convertToLockDO是将RowLock对象转化为LockDO对象
  2. protected List<LockDO> convertToLockDO(List<RowLock> locks) {
  3. List<LockDO> lockDOs = new ArrayList<>();
  4. if (CollectionUtils.isEmpty(locks)) {
  5. return lockDOs;
  6. }
  7. for (RowLock rowLock : locks) {
  8. LockDO lockDO = new LockDO();
  9. lockDO.setBranchId(rowLock.getBranchId());
  10. lockDO.setPk(rowLock.getPk());
  11. lockDO.setResourceId(rowLock.getResourceId());
  12. lockDO.setRowKey(getRowKey(rowLock.getResourceId(), rowLock.getTableName(), rowLock.getPk()));
  13. lockDO.setXid(rowLock.getXid());
  14. lockDO.setTransactionId(rowLock.getTransactionId());
  15. lockDO.setTableName(rowLock.getTableName());
  16. lockDOs.add(lockDO);
  17. }
  18. return lockDOs;
  19. }
  20. public boolean acquireLock(List<LockDO> lockDOs) {
  21. Connection conn = null;
  22. PreparedStatement ps = null;
  23. ResultSet rs = null;
  24. Set<String> dbExistedRowKeys = new HashSet<>();
  25. boolean originalAutoCommit = true;
  26. if (lockDOs.size() > 1) {
  27. //过滤掉重复的加锁记录
  28. lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
  29. }
  30. try {
  31. //获得连接
  32. conn = lockStoreDataSource.getConnection();
  33. if (originalAutoCommit = conn.getAutoCommit()) {
  34. conn.setAutoCommit(false);
  35. }
  36. //check lock
  37. //使用StringJoiner对象构建SQL语句的部分内容,分隔符是“,”
  38. StringJoiner sj = new StringJoiner(",");
  39. for (int i = 0; i < lockDOs.size(); i++) {
  40. sj.add("?");
  41. }
  42. boolean canLock = true;
  43. //query
  44. //构建检查当前记录是否已经加锁的SQL语句
  45. //checkLockSQL=select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified
  46. // from lock_table where row_key in ('?','?','?')
  47. String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, sj.toString());
  48. ps = conn.prepareStatement(checkLockSQL);
  49. for (int i = 0; i < lockDOs.size(); i++) {
  50. ps.setString(i + 1, lockDOs.get(i).getRowKey());
  51. }
  52. //执行上面的checkLockSQL语句
  53. rs = ps.executeQuery();
  54. String currentXID = lockDOs.get(0).getXid();
  55. //遍历数据库的查询结果,将当前事务的XID与查询结果中每条记录的XID进行比较,
  56. //查询结果中每条记录的XID表示之前已经有事务XID对该条记录加过锁了
  57. //如果记录已经被其他事务加锁,则设置canLock=false
  58. while (rs.next()) {
  59. String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
  60. if (!StringUtils.equals(dbXID, currentXID)) {
  61. if (LOGGER.isInfoEnabled()) {
  62. String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
  63. String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
  64. Long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
  65. LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID,
  66. dbBranchId);
  67. }
  68. canLock &= false;
  69. break;
  70. }
  71. //记录所有已经加过锁的row_key
  72. dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
  73. }
  74. if (!canLock) {
  75. //如果有记录已经加过锁,则回滚退出
  76. conn.rollback();
  77. return false;
  78. }
  79. List<LockDO> unrepeatedLockDOs = null;
  80. if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
  81. unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
  82. .collect(Collectors.toList());
  83. } else {
  84. unrepeatedLockDOs = lockDOs;
  85. }
  86. if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
  87. //表示都已经加过锁,无需再加锁
  88. conn.rollback();
  89. return true;
  90. }
  91. //lock
  92. //将LockDO对象插入到数据库中
  93. if (unrepeatedLockDOs.size() == 1) {
  94. LockDO lockDO = unrepeatedLockDOs.get(0);
  95. if (!doAcquireLock(conn, lockDO)) {
  96. if (LOGGER.isInfoEnabled()) {
  97. LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
  98. }
  99. conn.rollback();
  100. return false;
  101. }
  102. } else {
  103. if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
  104. if (LOGGER.isInfoEnabled()) {
  105. LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
  106. unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
  107. }
  108. conn.rollback();
  109. return false;
  110. }
  111. }
  112. //数据库提交
  113. conn.commit();
  114. return true;
  115. } catch (SQLException e) {
  116. throw new StoreException(e);
  117. } finally {
  118. IOUtil.close(rs, ps);
  119. if (conn != null) {
  120. try {
  121. if (originalAutoCommit) {
  122. conn.setAutoCommit(true);
  123. }
  124. conn.close();
  125. } catch (SQLException e) {
  126. }
  127. }
  128. }
  129. }

五、RedisLockManager

RedisLockManager的整体的结构与之前介绍DataBaseLockManager比较类似。
本小节只说明其加锁的流程。
首先也是将RowLock对象转化为LockDO,然后将“SEATA_LOCK_”与每个LockDO对象的rowLock属性拼接在一起作为redis的key,查看是否redis中是否已经有了该key,如果没有说明没有加锁,则将LockDO对象转化为JSON格式作为value连同上面的key一起存入redis。如果有key说明已经加锁了,则返回加锁失败。

相关文章