文章23 | 阅读 17187 | 点赞0
本文基于seata 1.3.0版本
全局事务锁状态查询请求的消息类型是MessageType.TYPE_GLOBAL_LOCK_QUERY,请求对象为GlobalLockQueryRequest,该请求由RM发起。该请求的作用是查询一条或多条记录是否上锁。
RM会将表名、需要查询是否上锁的主键值组装成一个字符串发送过来,TC解析字符串,每条记录创建一个RowLock对象,然后从LOCK_MAP中查看记录加锁情况,并将是否上锁的结果返回至RM。其中RM组装字符串的规则和分支事务注册请求中记录要加锁的lockKey组装规则是一致的,规则如下:
表名:主键值_主键值;表名:主键值_主键值
自然根据字符串创建RowLock对象的处理逻辑也是和分支事务注册请求的处理是一致的。
下面具体看一下代码逻辑,下面代码是DefaultCoordinator.doLockCheck方法:
protected void doLockCheck(GlobalLockQueryRequest request, GlobalLockQueryResponse response, RpcContext rpcContext)
throws TransactionException {
response.setLockable(
core.lockQuery(request.getBranchType(), request.getResourceId(), request.getXid(), request.getLockKey()));
}
doLockCheck调用了DefaultCore的lockQuery方法:
public boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys)
throws TransactionException {
//branchType=AT,因为现在使用的是AT模式,getCore(branchType)返回的对象是ATCore
return getCore(branchType).lockQuery(branchType, resourceId, xid, lockKeys);
}
//下面代码是ATCore的lockQuery方法:
public boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys)
throws TransactionException {
return lockManager.isLockable(xid, resourceId, lockKeys);
}
lockQuery方法中调用了LockManager的方法,LockManager根据配置文件中配置的store.mode参数的不同,而使用不同的对象,这里配置的是file,LockManager的实现类是FileLockManager,store.mode还可以配置db,redis,至于他们之间的区别,后面文章在做介绍。
下面看一下FileLockManager的isLockable方法:
//入参lockKey就是根据上面介绍的字符串组装规则形成的字符串,lockKey是请求报文里面的,可以参见DefaultCoordinator.doLockCheck方法
//isLockable方法返回true表示没有加锁,返回false表示已经加锁
public boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException {
if (StringUtils.isBlank(lockKey)) {
return true;
}
//collectRowLocks方法的处理过程与分支事务注册请求中是一样的,这里不再做介绍
//可以参见[《Seata解析-TC处理全局事务和分支事务原理详解之全局事务开启和分支事务注册》](https://blog.csdn.net/weixin_38308374/article/details/108457173)
List<RowLock> locks = collectRowLocks(lockKey, resourceId, xid);
try {
//LockManager的getLocker()方法用于创建FileLocker对象
return getLocker().isLockable(locks);
} catch (Exception t) {
LOGGER.error("isLockable error, xid:{} resourceId:{}, lockKey:{}", xid, resourceId, lockKey, t);
return false;
}
}
isLockable方法最后是调用FileLocker的isLockable方法:
public boolean isLockable(List<RowLock> rowLocks) {
if (CollectionUtils.isEmpty(rowLocks)) {
//no lock
//没有加锁
return true;
}
//全局事务ID
Long transactionId = rowLocks.get(0).getTransactionId();
//资源ID,也就是RM访问的数据库URL
String resourceId = rowLocks.get(0).getResourceId();
//LOCK_MAP里面存储了所有加锁的数据库数据
ConcurrentMap<String, ConcurrentMap<Integer, BucketLockMap>> dbLockMap = LOCK_MAP.get(resourceId);
if (dbLockMap == null) {
//如果dbLockMap为null,表示尚未有事务对该数据库加锁
return true;
}
//遍历每个数据库记录
for (RowLock rowLock : rowLocks) {
String xid = rowLock.getXid();
String tableName = rowLock.getTableName();
String pk = rowLock.getPk();//pk是主键值
ConcurrentMap<Integer, BucketLockMap> tableLockMap = dbLockMap.get(tableName);
if (tableLockMap == null) {
//tableLockMap为null,表示该表中没有记录被加锁
continue;
}
//根据主键值计算桶位
int bucketId = pk.hashCode() % BUCKET_PER_TABLE;
BucketLockMap bucketLockMap = tableLockMap.get(bucketId);
if (bucketLockMap == null) {
//bucketLockMap为null,表示该主键值中没有被加锁
continue;
}
//根据主键值从bucketLockMap中取出已经对该主键值加锁的事务ID
Long lockingTransactionId = bucketLockMap.get().get(pk);
if (lockingTransactionId == null || lockingTransactionId.longValue() == transactionId) {
// 如果lockingTransactionId为null,表示没有事务对该主键值加锁
// 如果lockingTransactionId与当前事务ID相同,表示两个事务是同一个事务,可以认为是没有加锁的
continue;
} else {
//如果主键值已经被加锁,则返回false
LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId);
return false;
}
}
return true;
}
isLockable方法里面用到了LOCK_MAP、BucketLockMap、桶,详细请参见文章《Seata解析-TC处理全局事务和分支事务原理详解之全局事务开启和分支事务注册》
全局事务提交请求的消息类型是MessageType.TYPE_GLOBAL_STATUS,请求对象为GlobalStatusRequest,该请求由TM发起。该请求的作用是查询全局事务的状态,TC直接将事务状态返回。
处理全局事务状态查询请求是通过DefaultCoordinator的doGlobalStatus调用了DefaultCore的getStatus方法,下面看一下这个方法:
public GlobalStatus getStatus(String xid) throws TransactionException {
//第二个入参false表示不查询分支事务
GlobalSession globalSession = SessionHolder.findGlobalSession(xid, false);
if (globalSession == null) {
//为null,表示当前事务不是本seata服务器管理的事务
return GlobalStatus.Finished;
} else {
return globalSession.getStatus();
}
}
可以看到getStatus方法非常简单,根据XID查询出GlobalSession对象,然后返回对象里面的状态即可。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_38308374/article/details/108532122
内容来源于网络,如有侵权,请联系作者删除!