文章23 | 阅读 16345 | 点赞0
本文基于seata 1.3.0版本
之前在介绍分支事务注册的时候需要对记录主键加锁,这里加锁使用便是LockManager的实现类FileLockManager。本文将详细介绍seata的锁管理器。
锁管理器的顶级接口是LockManager,锁管理器必须要实现该接口。
public interface LockManager {
/** * 申请锁 */
boolean acquireLock(BranchSession branchSession) throws TransactionException;
/** * 释放锁 */
boolean releaseLock(BranchSession branchSession) throws TransactionException;
/** * 释放全局事务中每个分支事务的锁 */
boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException;
/** * 查看是否对lockKey加了锁 */
boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException;
/** * 清理所有的锁,该方法还没有调用点 */
void cleanAllLocks() throws TransactionException;
}
LockManager的方法相对比较简单,看方法名就可以知道其作用了。
seata提供了LockManager的抽象实现类AbstractLockManager,如果我们自定义一个锁管理器,可以通过对该抽象类扩展。
除了释放锁和锁查询之外,AbstractLockManager的其他关键方法在文章《Seata解析-TC处理全局事务和分支事务原理详解之全局事务开启和分支事务注册》已经做过了介绍,本文不再介绍。下面看一下释放锁和锁查询这两个方法:
public boolean releaseLock(BranchSession branchSession) throws TransactionException {
if (branchSession == null) {
throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
}
List<RowLock> locks = collectRowLocks(branchSession);
try {
return getLocker(branchSession).releaseLock(locks);
} catch (Exception t) {
LOGGER.error("unLock error, branchSession:{}", branchSession, t);
return false;
}
}
@Override
public boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException {
if (StringUtils.isBlank(lockKey)) {
// no lock
return true;
}
List<RowLock> locks = collectRowLocks(lockKey, resourceId, xid);
try {
return getLocker().isLockable(locks);
} catch (Exception t) {
LOGGER.error("isLockable error, xid:{} resourceId:{}, lockKey:{}", xid, resourceId, lockKey, t);
return false;
}
}
这两个方法都是先调用collectRowLocks方法解析lockKey,lockKey的规则在之前的文章里面也已经介绍过。解析后调用getLocker(),由该方法返回的对象进行后续处理。
这个getLocker()方法是本类的抽象方法,由子类去实现。目前AbstractLockManager有三个子类,分别是RedisLockManager、DataBaseLockManager、FileLockManager,其中FileLockManager最简单。
FileLockManager继承自AbstractLockManager。
FileLockManager非常简单,只有两个方法:
public Locker getLocker(BranchSession branchSession) {
return new FileLocker(branchSession);
}
@Override
public boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException {
//遍历全局事务下的每个分支事务,找到分支事务加的锁,然后释放
//释放锁调用的也是FileLocker的方法
ArrayList<BranchSession> branchSessions = globalSession.getBranchSessions();
boolean releaseLockResult = true;
for (BranchSession branchSession : branchSessions) {
//调用父类的releaseLock
if (!this.releaseLock(branchSession)) {
releaseLockResult = false;
}
}
return releaseLockResult;
}
FileLockManager实现了父类的getLocker(),通过之前的代码介绍可以看到,几乎所有与锁相关的方法最后都需要调用getLocker()。
在FileLockManager中,getLocker返回对象FileLocker。下面介绍一下FileLocker类。
下面是FileLocker的构造方法,从构造方法可以看出,该类只处理BranchSession,换句话说该类只用于对数据库记录的主键加锁:
public FileLocker(BranchSession branchSession) {
this.branchSession = branchSession;
}
该类的一些方法之前也介绍过,这里只介绍它的一个属性LOCK_MAP:
private static final ConcurrentMap<String/* resourceId */, ConcurrentMap<String/* tableName */, ConcurrentMap<Integer/* bucketId */, BucketLockMap>>>
LOCK_MAP = new ConcurrentHashMap<>();
LOCK_MAP是static的,该属性是一个全局属性。LOCK_MAP的作用是记录TC端所有的数据库记录锁,其存储的数据映射结构是:
数据库资源->表名->桶->BucketLockMap
数据库资源指的是数据库,对记录加锁时,seata会计算记录主键的哈希值,然后将其分散到128个桶中,BucketLockMap可以简单的理解为Map对象,是记录主键值与transactionId的对应关系,如果某个记录属于1号桶,那么就将该记录的主键值和加锁的transactionId添加到1号桶的BucketLockMap对象中。
每个分支事务注册的时候如果对记录加锁,那么就要在LOCK_MAP中增加一条锁记录。客户端查询某个记录是否加锁时,也是查看LOCK_MAP。
因为LOCK_MAP存放在内存中,所以FileLocker提供的其实是内存类型的锁。
DataBaseLockManager是将锁信息存储到了数据库中,因此需要首先在数据库中建一张记录锁的表,该表包含的字段如下:
xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified
其他字段看名字就可以知道其含义,这里只介绍后面几个字段的含义:
具体的建表语句可以在script模块的server/db目录下找到。
表名默认是lock_table,可以通过store.db.lockTable设置其他表名。
下面来看一下DataBaseLockManager的初始化方法:
//init方法在SPI创建对象后初始化时调用的,SPI会自动初始化实现Initialize接口的类
public void init() {
// init dataSource
//datasourceType表示访问数据库,使用哪种连接池,seata提供了三种:dbcp,druid,hihari
String datasourceType = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);
//DataSourceProvider主要作用是创建出指定类型的DataSource对象
DataSource lockStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();
//创建数据库锁对象
locker = new DataBaseLocker(lockStoreDataSource);
}
从上面的代码可以看出,DataBaseLockManager对记录加锁使用的是DataBaseLocker对象。
下面是DataBaseLocker的构造方法:
public DataBaseLocker(DataSource logStoreDataSource) {
lockStore = new LockStoreDataBaseDAO(logStoreDataSource);
}
在构造方法中又创建了LockStoreDataBaseDAO,其实这个LockStoreDataBaseDAO对象类似于FileLocker的LOCK_MAP的作用,DataBaseLocker存取锁到数据库都是通过LockStoreDataBaseDAO完成的,所以下面重点分析LockStoreDataBaseDAO。
public LockStoreDataBaseDAO(DataSource lockStoreDataSource) {
//数据源
this.lockStoreDataSource = lockStoreDataSource;
//存储锁的表名,默认是lock_table
//可以通过store.db.lockTable设置表名
lockTable = CONFIG.getConfig(ConfigurationKeys.LOCK_DB_TABLE, DEFAULT_LOCK_DB_TABLE);
//dbType表示使用的是什么数据库,可以mysql,oracle等
//通过store.db.dbType设置
dbType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_TYPE);
if (StringUtils.isBlank(dbType)) {
throw new StoreException("there must be db type.");
}
if (lockStoreDataSource == null) {
throw new StoreException("there must be lockStoreDataSource.");
}
}
本文只分析一下加锁的流程。
加锁是通过acquireLock方法完成的。
首先将加锁信息的RowLock对象转化为LockDO对象,LockDO对象的属性与LOCK_TABLE的字段一一对应,转换之后根据LockDO对象的rowKey查询表LOCK_TABLE是否已经有加锁数据,如果没有加锁或者加锁的事务是当前事务,那么认为可以加锁,接下来将LockDO对象插入表LOCK_TABLE中,否则认为加锁失败,将刚才加过的锁解锁,然后返回加锁失败。下面看一下代码:
//convertToLockDO是将RowLock对象转化为LockDO对象
protected List<LockDO> convertToLockDO(List<RowLock> locks) {
List<LockDO> lockDOs = new ArrayList<>();
if (CollectionUtils.isEmpty(locks)) {
return lockDOs;
}
for (RowLock rowLock : locks) {
LockDO lockDO = new LockDO();
lockDO.setBranchId(rowLock.getBranchId());
lockDO.setPk(rowLock.getPk());
lockDO.setResourceId(rowLock.getResourceId());
lockDO.setRowKey(getRowKey(rowLock.getResourceId(), rowLock.getTableName(), rowLock.getPk()));
lockDO.setXid(rowLock.getXid());
lockDO.setTransactionId(rowLock.getTransactionId());
lockDO.setTableName(rowLock.getTableName());
lockDOs.add(lockDO);
}
return lockDOs;
}
public boolean acquireLock(List<LockDO> lockDOs) {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
Set<String> dbExistedRowKeys = new HashSet<>();
boolean originalAutoCommit = true;
if (lockDOs.size() > 1) {
//过滤掉重复的加锁记录
lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
}
try {
//获得连接
conn = lockStoreDataSource.getConnection();
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
//check lock
//使用StringJoiner对象构建SQL语句的部分内容,分隔符是“,”
StringJoiner sj = new StringJoiner(",");
for (int i = 0; i < lockDOs.size(); i++) {
sj.add("?");
}
boolean canLock = true;
//query
//构建检查当前记录是否已经加锁的SQL语句
//checkLockSQL=select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified
// from lock_table where row_key in ('?','?','?')
String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, sj.toString());
ps = conn.prepareStatement(checkLockSQL);
for (int i = 0; i < lockDOs.size(); i++) {
ps.setString(i + 1, lockDOs.get(i).getRowKey());
}
//执行上面的checkLockSQL语句
rs = ps.executeQuery();
String currentXID = lockDOs.get(0).getXid();
//遍历数据库的查询结果,将当前事务的XID与查询结果中每条记录的XID进行比较,
//查询结果中每条记录的XID表示之前已经有事务XID对该条记录加过锁了
//如果记录已经被其他事务加锁,则设置canLock=false
while (rs.next()) {
String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
if (!StringUtils.equals(dbXID, currentXID)) {
if (LOGGER.isInfoEnabled()) {
String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
Long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID,
dbBranchId);
}
canLock &= false;
break;
}
//记录所有已经加过锁的row_key
dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
}
if (!canLock) {
//如果有记录已经加过锁,则回滚退出
conn.rollback();
return false;
}
List<LockDO> unrepeatedLockDOs = null;
if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
.collect(Collectors.toList());
} else {
unrepeatedLockDOs = lockDOs;
}
if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
//表示都已经加过锁,无需再加锁
conn.rollback();
return true;
}
//lock
//将LockDO对象插入到数据库中
if (unrepeatedLockDOs.size() == 1) {
LockDO lockDO = unrepeatedLockDOs.get(0);
if (!doAcquireLock(conn, lockDO)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
}
conn.rollback();
return false;
}
} else {
if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
}
conn.rollback();
return false;
}
}
//数据库提交
conn.commit();
return true;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(rs, ps);
if (conn != null) {
try {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
conn.close();
} catch (SQLException e) {
}
}
}
}
RedisLockManager的整体的结构与之前介绍DataBaseLockManager比较类似。
本小节只说明其加锁的流程。
首先也是将RowLock对象转化为LockDO,然后将“SEATA_LOCK_”与每个LockDO对象的rowLock属性拼接在一起作为redis的key,查看是否redis中是否已经有了该key,如果没有说明没有加锁,则将LockDO对象转化为JSON格式作为value连同上面的key一起存入redis。如果有key说明已经加锁了,则返回加锁失败。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_38308374/article/details/108545230
内容来源于网络,如有侵权,请联系作者删除!