Seata解析-TC处理全局事务和分支事务原理详解之全局事务锁状态查询请求和全局事务状态查询请求

x33g5p2x  于2021-12-21 转载在 其他  
字(4.1k)|赞(0)|评价(0)|浏览(482)

本文基于seata 1.3.0版本

一、全局事务锁状态查询请求

全局事务锁状态查询请求的消息类型是MessageType.TYPE_GLOBAL_LOCK_QUERY,请求对象为GlobalLockQueryRequest,该请求由RM发起。该请求的作用是查询一条或多条记录是否上锁。
RM会将表名、需要查询是否上锁的主键值组装成一个字符串发送过来,TC解析字符串,每条记录创建一个RowLock对象,然后从LOCK_MAP中查看记录加锁情况,并将是否上锁的结果返回至RM。其中RM组装字符串的规则和分支事务注册请求中记录要加锁的lockKey组装规则是一致的,规则如下:
表名:主键值_主键值;表名:主键值_主键值

自然根据字符串创建RowLock对象的处理逻辑也是和分支事务注册请求的处理是一致的。
下面具体看一下代码逻辑,下面代码是DefaultCoordinator.doLockCheck方法:

  1. protected void doLockCheck(GlobalLockQueryRequest request, GlobalLockQueryResponse response, RpcContext rpcContext)
  2. throws TransactionException {
  3. response.setLockable(
  4. core.lockQuery(request.getBranchType(), request.getResourceId(), request.getXid(), request.getLockKey()));
  5. }

doLockCheck调用了DefaultCore的lockQuery方法:

  1. public boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys)
  2. throws TransactionException {
  3. //branchType=AT,因为现在使用的是AT模式,getCore(branchType)返回的对象是ATCore
  4. return getCore(branchType).lockQuery(branchType, resourceId, xid, lockKeys);
  5. }
  6. //下面代码是ATCore的lockQuery方法:
  7. public boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys)
  8. throws TransactionException {
  9. return lockManager.isLockable(xid, resourceId, lockKeys);
  10. }

lockQuery方法中调用了LockManager的方法,LockManager根据配置文件中配置的store.mode参数的不同,而使用不同的对象,这里配置的是file,LockManager的实现类是FileLockManager,store.mode还可以配置db,redis,至于他们之间的区别,后面文章在做介绍。
下面看一下FileLockManager的isLockable方法:

  1. //入参lockKey就是根据上面介绍的字符串组装规则形成的字符串,lockKey是请求报文里面的,可以参见DefaultCoordinator.doLockCheck方法
  2. //isLockable方法返回true表示没有加锁,返回false表示已经加锁
  3. public boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException {
  4. if (StringUtils.isBlank(lockKey)) {
  5. return true;
  6. }
  7. //collectRowLocks方法的处理过程与分支事务注册请求中是一样的,这里不再做介绍
  8. //可以参见[《Seata解析-TC处理全局事务和分支事务原理详解之全局事务开启和分支事务注册》](https://blog.csdn.net/weixin_38308374/article/details/108457173)
  9. List<RowLock> locks = collectRowLocks(lockKey, resourceId, xid);
  10. try {
  11. //LockManager的getLocker()方法用于创建FileLocker对象
  12. return getLocker().isLockable(locks);
  13. } catch (Exception t) {
  14. LOGGER.error("isLockable error, xid:{} resourceId:{}, lockKey:{}", xid, resourceId, lockKey, t);
  15. return false;
  16. }
  17. }

isLockable方法最后是调用FileLocker的isLockable方法:

  1. public boolean isLockable(List<RowLock> rowLocks) {
  2. if (CollectionUtils.isEmpty(rowLocks)) {
  3. //no lock
  4. //没有加锁
  5. return true;
  6. }
  7. //全局事务ID
  8. Long transactionId = rowLocks.get(0).getTransactionId();
  9. //资源ID,也就是RM访问的数据库URL
  10. String resourceId = rowLocks.get(0).getResourceId();
  11. //LOCK_MAP里面存储了所有加锁的数据库数据
  12. ConcurrentMap<String, ConcurrentMap<Integer, BucketLockMap>> dbLockMap = LOCK_MAP.get(resourceId);
  13. if (dbLockMap == null) {
  14. //如果dbLockMap为null,表示尚未有事务对该数据库加锁
  15. return true;
  16. }
  17. //遍历每个数据库记录
  18. for (RowLock rowLock : rowLocks) {
  19. String xid = rowLock.getXid();
  20. String tableName = rowLock.getTableName();
  21. String pk = rowLock.getPk();//pk是主键值
  22. ConcurrentMap<Integer, BucketLockMap> tableLockMap = dbLockMap.get(tableName);
  23. if (tableLockMap == null) {
  24. //tableLockMap为null,表示该表中没有记录被加锁
  25. continue;
  26. }
  27. //根据主键值计算桶位
  28. int bucketId = pk.hashCode() % BUCKET_PER_TABLE;
  29. BucketLockMap bucketLockMap = tableLockMap.get(bucketId);
  30. if (bucketLockMap == null) {
  31. //bucketLockMap为null,表示该主键值中没有被加锁
  32. continue;
  33. }
  34. //根据主键值从bucketLockMap中取出已经对该主键值加锁的事务ID
  35. Long lockingTransactionId = bucketLockMap.get().get(pk);
  36. if (lockingTransactionId == null || lockingTransactionId.longValue() == transactionId) {
  37. // 如果lockingTransactionId为null,表示没有事务对该主键值加锁
  38. // 如果lockingTransactionId与当前事务ID相同,表示两个事务是同一个事务,可以认为是没有加锁的
  39. continue;
  40. } else {
  41. //如果主键值已经被加锁,则返回false
  42. LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId);
  43. return false;
  44. }
  45. }
  46. return true;
  47. }

isLockable方法里面用到了LOCK_MAP、BucketLockMap、桶,详细请参见文章《Seata解析-TC处理全局事务和分支事务原理详解之全局事务开启和分支事务注册》

二、全局事务状态查询请求

全局事务提交请求的消息类型是MessageType.TYPE_GLOBAL_STATUS,请求对象为GlobalStatusRequest,该请求由TM发起。该请求的作用是查询全局事务的状态,TC直接将事务状态返回。
处理全局事务状态查询请求是通过DefaultCoordinator的doGlobalStatus调用了DefaultCore的getStatus方法,下面看一下这个方法:

  1. public GlobalStatus getStatus(String xid) throws TransactionException {
  2. //第二个入参false表示不查询分支事务
  3. GlobalSession globalSession = SessionHolder.findGlobalSession(xid, false);
  4. if (globalSession == null) {
  5. //为null,表示当前事务不是本seata服务器管理的事务
  6. return GlobalStatus.Finished;
  7. } else {
  8. return globalSession.getStatus();
  9. }
  10. }

可以看到getStatus方法非常简单,根据XID查询出GlobalSession对象,然后返回对象里面的状态即可。

相关文章