Seata解析-TC处理全局事务和分支事务原理详解之全局事务提交请求和全局事务回滚请求

x33g5p2x  于2021-12-21 转载在 其他  
字(7.4k)|赞(0)|评价(0)|浏览(649)

本文基于seata 1.3.0版本

本文继续介绍TC对请求的处理。
本文将分析全局事务提交请求和全局事务回滚请求的处理逻辑。

一、全局事务提交请求

全局事务提交请求的消息类型是MessageType.TYPE_GLOBAL_COMMIT,请求对象为GlobalCommitRequest,该请求由TM发起。该请求的作用是报告某个全局事务执行完毕请求提交。TC收到后要做出以下更改:

  1. 对全局事务对象GlobalSession加锁;
  2. 释放分支事务锁,修改GlobalSession的状态为提交中;
  3. 释放GlobalSession锁;
  4. 当前处于AT模式,事务可以异步提交,将当前全局事务添加到异步提交管理器;
  5. 修改全局事务状态为异步提交中;
  6. 返回TM当前全局事务状态。

这里简单介绍一下异步提交管理器后续对全局事务做的处理是:通知各个分支事务,要求分支事务提交,如果分支事务提交都成功了,则修改全局事务状态为提交成功,如果有失败的,则重试通知。异步提交管理器后面有文章专门做介绍。

从上面的描述可以看出,全局事务提交请求主要是释放分支事务锁,通知分支事务提交,最后修改全局事务状态。
下面来看一下代码逻辑。下面直接从DefaultCore的commit方法开始介绍。

  1. public GlobalStatus commit(String xid) throws TransactionException {
  2. //根据XID找到全局事务对象GlobalSession
  3. GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
  4. if (globalSession == null) {
  5. //如果GlobalSession没有找到,说明当前事务是非seata管理的
  6. return GlobalStatus.Finished;
  7. }
  8. //添加监听器
  9. globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
  10. // just lock changeStatus
  11. //shouldCommit表示当前事务是否可以提交,如果事务状态为GlobalStatus.Begin,则可以提交,否则不可以
  12. boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
  13. // the lock should release after branch commit
  14. // Highlight: Firstly, close the session, then no more branch can be registered.
  15. //closeAndClean:将关闭消息通知上面注册的监听器,然后释放分支事务锁
  16. globalSession.closeAndClean();
  17. //如果事务状态为开始,那么将状态直接改为提交中
  18. if (globalSession.getStatus() == GlobalStatus.Begin) {
  19. globalSession.changeStatus(GlobalStatus.Committing);
  20. return true;
  21. }
  22. return false;
  23. });
  24. if (!shouldCommit) {
  25. //返回当前事务状态,表示禁止全局事务提交,事务提交失败
  26. return globalSession.getStatus();
  27. }
  28. //遍历分支事务是否可以异步提交,如果分支事务有TCC或者XA的,则不能异步提交
  29. //本文介绍的场景都是AT模式的,因此globalSession.canBeCommittedAsync()返回true
  30. //下面只介绍if分支
  31. if (globalSession.canBeCommittedAsync()) {
  32. //执行异步提交
  33. globalSession.asyncCommit();
  34. //返回事务状态为提交成功
  35. return GlobalStatus.Committed;
  36. } else {
  37. //执行同步提交
  38. doGlobalCommit(globalSession, false);
  39. }
  40. //返回最终事务状态
  41. return globalSession.getStatus();
  42. }

上面代码使用到了globalSession.asyncCommit方法,下面来看一下这个方法:

  1. public void asyncCommit() throws TransactionException {
  2. //添加监听器
  3. this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());
  4. //将当前全局事务添加到异步提交管理器,异步提交管理器后续文章介绍
  5. SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);
  6. //修改事务状态为异步提交中
  7. this.changeStatus(GlobalStatus.AsyncCommitting);
  8. }

从asyncCommit方法可以看出在AT模式下,通知分支事务提交委托给异步提交管理器去完成了。
不知道大家有没有一个疑问,为什么不先通知分支事务提交,之后根据提交结果再释放分支事务锁?
这里我理解是,既然TM通知全局事务提交了,那么说明当前事务的执行过程是成功的,没有错误,而且分支事务在执行结束后就已经提交了,也就是在全局事务提交前,分支事务对数据的改变就已经写入数据库了,TC通知分支事务提交是为了处理回滚日志等,这些处理与业务处理关系不大,即使分支事务提交失败也不会有影响,但是分支事务加的锁就不同了,如果锁一直加着,就会影响其他事务的执行,严重可能造成大量事务执行失败,所以先释放锁,让其他需要锁的事务可以正常执行,至于分支事务提交可以异步进行,即使失败也没有影响。

二、全局事务回滚请求

全局事务回滚请求的消息类型是MessageType.TYPE_GLOBAL_ROLLBACK,请求对象为GlobalRollbackRequest,该请求由TM发起。该请求的作用是回滚全局事务。TC收到后要做出以下更改:

  1. 全局事务对象加锁;
  2. 修改全局事务状态为回滚中;
  3. 全局事务对象解锁;
  4. 通知各个分支事务回滚;
  5. 如果有分支事务回顾失败的,则通知TM回滚失败,如果分支事务全部回滚成功,则更改全局事务状态为回滚成功,并且释放分支事务锁。

对于上述流程,有个问题就是对于回滚失败的分支事务怎么处理,因为一个全局事务包含多个分支事务,可能有的回滚成功了,有的回滚失败,这样会造成事务的不一致,这个问题等到介绍TM的时候在分析。
下面看一下具体的代码实现,从DefaultCore的rollback方法看起:

  1. public GlobalStatus rollback(String xid) throws TransactionException {
  2. //获得XID对应的GlobalSession
  3. GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
  4. if (globalSession == null) {
  5. //如果没有找到,说明当前事务不是seat管理的
  6. return GlobalStatus.Finished;
  7. }
  8. //添加监听器
  9. globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
  10. // just lock changeStatus
  11. //lockAndExecute方法会对全局事务对象GlobalSession加锁,然后执行里面的回调方法,
  12. //回调成功后,再解锁
  13. boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
  14. //close方法是将关闭事件通知给之前注册的监听器
  15. globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
  16. if (globalSession.getStatus() == GlobalStatus.Begin) {
  17. //修改状态为回滚中
  18. globalSession.changeStatus(GlobalStatus.Rollbacking);
  19. return true;
  20. }
  21. return false;
  22. });
  23. if (!shouldRollBack) {
  24. return globalSession.getStatus();
  25. }
  26. doGlobalRollback(globalSession, false);
  27. return globalSession.getStatus();
  28. }
  29. public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
  30. boolean success = true;
  31. // start rollback event
  32. //发布事务回滚中的事件
  33. eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
  34. globalSession.getTransactionName(), globalSession.getBeginTime(), null, globalSession.getStatus()));
  35. //当前使用的模式是AT模式,因此只执行else分支
  36. if (globalSession.isSaga()) {
  37. success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
  38. } else {
  39. //遍历全局事务下的每个分支事务
  40. for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
  41. BranchStatus currentBranchStatus = branchSession.getStatus();
  42. if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
  43. //如果分支事务在一阶段失败了,说明事务更改没有写入数据库,
  44. //则分支事务无需做操作,将该分支事务从全局事务中删除即可
  45. globalSession.removeBranch(branchSession);
  46. continue;
  47. }
  48. try {
  49. //branchRollback:构建BranchRollbackRequest请求对象,通知分支事务做回滚
  50. BranchStatus branchStatus = branchRollback(globalSession, branchSession);
  51. switch (branchStatus) {
  52. case PhaseTwo_Rollbacked:
  53. //如果二阶段回退成功,则直接将分支事务从全局事务中删除
  54. globalSession.removeBranch(branchSession);
  55. LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
  56. continue;
  57. case PhaseTwo_RollbackFailed_Unretryable:
  58. //事务回滚失败,则修改全局事务状态为回滚失败,并且释放所有的分支事务锁
  59. //然后给TM返回回滚失败
  60. SessionHelper.endRollbackFailed(globalSession);
  61. LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
  62. return false;
  63. default:
  64. LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
  65. if (!retrying) {
  66. //将全局事务加入到回滚重试管理器
  67. globalSession.queueToRetryRollback();
  68. }
  69. return false;
  70. }
  71. } catch (Exception ex) {
  72. StackTraceLogger.error(LOGGER, ex,
  73. "Rollback branch transaction exception, xid = {} branchId = {} exception = {}",
  74. new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
  75. if (!retrying) {
  76. //如果异常了,则将全局事务加入到回滚重试管理器
  77. globalSession.queueToRetryRollback();
  78. }
  79. throw new TransactionException(ex);
  80. }
  81. }
  82. // In db mode, there is a problem of inconsistent data in multiple copies, resulting in new branch
  83. // transaction registration when rolling back.
  84. // 1. New branch transaction and rollback branch transaction have no data association
  85. // 2. New branch transaction has data association with rollback branch transaction
  86. // The second query can solve the first problem, and if it is the second problem, it may cause a rollback
  87. // failure due to data changes.
  88. //能进行到这个位置,说明分支事务全部回滚成功了,GlobalSession中已经没有分支事务了
  89. //根据注释,增加下面的代码应该是在db模式下出过问题,但是新分支注册会对全局事务对象加锁,回滚也会加锁,
  90. //所以回滚时,不会发生新的分支事务注册情况,对于上面的注释大家如果知道原因烦请指导
  91. GlobalSession globalSessionTwice = SessionHolder.findGlobalSession(globalSession.getXid());
  92. if (globalSessionTwice != null && globalSessionTwice.hasBranch()) {
  93. LOGGER.info("Rollbacking global transaction is NOT done, xid = {}.", globalSession.getXid());
  94. return false;
  95. }
  96. }
  97. //success永远都是true
  98. if (success) {
  99. //endRollbacked更改全局事务状态为回滚成功,并且释放分支事务锁
  100. SessionHelper.endRollbacked(globalSession);
  101. // rollbacked event
  102. //发布回滚成功事件
  103. eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
  104. globalSession.getTransactionName(), globalSession.getBeginTime(), System.currentTimeMillis(),
  105. globalSession.getStatus()));
  106. LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
  107. }
  108. return success;
  109. }

全局事务回滚请求的代码逻辑和全局事务提交请求的逻辑比较类似。
全局事务提交请求和全局事务回滚请求的代码里面都在GlobalSession对象中添加了监听器,然后在执行下面这行代码时,会通知监听器:

  1. globalSession.close();
  2. //globalSession的close方法
  3. public void close() throws TransactionException {
  4. if (active) {
  5. for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
  6. lifecycleListener.onClose(this);
  7. }
  8. }
  9. }

下面我们看一下监听器的处理逻辑:

  1. public void onClose(GlobalSession globalSession) throws TransactionException {
  2. globalSession.setActive(false);
  3. }

监听器的处理还是非常简单的,只是将globalSession对象有激活状态改为非激活状态,改为非激活状态后,新分支事务就不能在注册了。

相关文章