Seata解析-TC处理全局事务和分支事务原理详解之分支状态报告请求和全局事务报告请求

x33g5p2x  于2021-12-21 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(701)

本文基于seata 1.3.0版本

本文接文章《Seata解析-TC处理全局事务和分支事务原理详解之全局事务开启和分支事务注册》继续介绍TC对请求的处理。本文将介绍分支状态报告请求和全局事务报告请求。

一、分支状态报告请求

分支状态报告请求的消息类型是MessageType.TYPE_BRANCH_STATUS_REPORT,请求对象为BranchReportRequest,该请求由RM发起。该请求的作用是报告某个分支事务的状态,TC收到后更改对应BranchSession对象的状态。
处理该请求通过模板方法调用了DefaultCoordinator.doBranchReport:

  1. protected void doBranchReport(BranchReportRequest request, BranchReportResponse response, RpcContext rpcContext)
  2. throws TransactionException {
  3. //调用DefaultCore的branchReport方法
  4. core.branchReport(request.getBranchType(), request.getXid(), request.getBranchId(), request.getStatus(),
  5. request.getApplicationData());
  6. }

在doBranchReport方法中又调用了DefaultCore的branchReport方法,在branchReport方法中又调用了其他方法,最终调用到ATCore的branchReport方法:

  1. public void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status,
  2. String applicationData) throws TransactionException {
  3. //根据XID得到全局事务对象GlobalSession
  4. GlobalSession globalSession = assertGlobalSessionNotNull(xid, true);
  5. //根据branchId得到分支事务对象branchSession
  6. BranchSession branchSession = globalSession.getBranch(branchId);
  7. if (branchSession == null) {
  8. throw new BranchTransactionException(BranchTransactionNotExist,
  9. String.format("Could not found branch session xid = %s branchId = %s", xid, branchId));
  10. }
  11. //添加监听器
  12. globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
  13. //更改分支事务状态,本请求的处理核心在下面这个方法中
  14. globalSession.changeBranchStatus(branchSession, status);
  15. if (LOGGER.isInfoEnabled()) {
  16. LOGGER.info("Report branch status successfully, xid = {}, branchId = {}", globalSession.getXid(),
  17. branchSession.getBranchId());
  18. }
  19. }

branchReport方法在最后调用了GlobalSession的changeBranchStatus方法,这个方法也非常简单,就是直接更改了分支事务的状态:

  1. public void changeBranchStatus(BranchSession branchSession, BranchStatus status)
  2. throws TransactionException {
  3. //设置分支事务对象的状态
  4. branchSession.setStatus(status);
  5. //通知监听器,监听器的处理后续文章介绍
  6. for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
  7. lifecycleListener.onBranchStatusChange(this, branchSession, status);
  8. }
  9. }

分支状态报告请求的处理还是比较简单的,总起来说就是找到分支事务对象BranchSession,然后直接修改其状态。

二、全局事务报告请求

全局事务报告请求的消息类型是MessageType.TYPE_GLOBAL_REPORT,请求对象为GlobalReportRequest,该请求由TM发起。该请求的作用是报告某个全局事务的状态,TC收到后更改该全局事务的状态。
全局事务报告请求的处理通过模板方法调用了协调器对象DefaultCoordinator的doGlobalReport方法,之后在doGlobalReport方法里面进一步调用到DefaultCore的globalReport方法:

  1. public GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException {
  2. //根据XID找到全局事务
  3. GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
  4. if (globalSession == null) {
  5. return globalStatus;
  6. }
  7. //添加监听器
  8. globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
  9. doGlobalReport(globalSession, xid, globalStatus);
  10. return globalSession.getStatus();
  11. }
  12. @Override
  13. public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus globalStatus) throws TransactionException {
  14. if (globalSession.isSaga()) {
  15. //getCore(BranchType.SAGA)方法返回的对象是SagaCore,现在使用的是AT模式,不知道为什么调用SagaCore对象
  16. getCore(BranchType.SAGA).doGlobalReport(globalSession, xid, globalStatus);
  17. }
  18. }

globalReport和doGlobalReport两个方法也非常简单,就是找到全局事务对象,然后调用SagaCore的doGlobalReport方法:

  1. public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus globalStatus) throws TransactionException {
  2. //全局事务提交
  3. if (GlobalStatus.Committed.equals(globalStatus)) {
  4. removeAllBranches(globalSession);
  5. SessionHelper.endCommitted(globalSession);
  6. LOGGER.info("Global[{}] committed", globalSession.getXid());
  7. }
  8. //全局事务回滚,Finished未知,等分析TM的时候在介绍该状态
  9. else if (GlobalStatus.Rollbacked.equals(globalStatus)
  10. || GlobalStatus.Finished.equals(globalStatus)) {
  11. removeAllBranches(globalSession);
  12. SessionHelper.endRollbacked(globalSession);
  13. LOGGER.info("Global[{}] rollbacked", globalSession.getXid());
  14. }
  15. //其他状态
  16. else {
  17. globalSession.changeStatus(globalStatus);
  18. LOGGER.info("Global[{}] reporting is successfully done. status[{}]", globalSession.getXid(), globalSession.getStatus());
  19. if (GlobalStatus.RollbackRetrying.equals(globalStatus)
  20. || GlobalStatus.TimeoutRollbackRetrying.equals(globalStatus)
  21. || GlobalStatus.UnKnown.equals(globalStatus)) {
  22. globalSession.queueToRetryRollback();
  23. LOGGER.info("Global[{}] will retry rollback", globalSession.getXid());
  24. } else if (GlobalStatus.CommitRetrying.equals(globalStatus)) {
  25. globalSession.queueToRetryCommit();
  26. LOGGER.info("Global[{}] will retry commit", globalSession.getXid());
  27. }
  28. }
  29. }

可以看到上面分了三种情况处理全局事务状态,下面我们也分三种情况介绍处理逻辑。

1、提交全局事务

提交全局事务调用了两个方法:removeAllBranches和SessionHelper.endCommitted。
removeAllBranches方法将GlobalSession对象中的分支事务集合清空,同时释放分支事务对数据库记录加的锁,释放锁其实就是将上一篇文章介绍的BucketLockMap对象中的数据库记录和加锁的事务id删除。SessionHelper.endCommitted将修改GlobalSession对象中的状态为已提交(GlobalStatus.Committed):

  1. private void removeAllBranches(GlobalSession globalSession) throws TransactionException {
  2. //得到所有的分支事务
  3. ArrayList<BranchSession> branchSessions = globalSession.getSortedBranches();
  4. for (BranchSession branchSession : branchSessions) {
  5. globalSession.removeBranch(branchSession);
  6. }
  7. }
  8. //下面是GlobalSession中的方法
  9. public void removeBranch(BranchSession branchSession) throws TransactionException {
  10. for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
  11. lifecycleListener.onRemoveBranch(this, branchSession);
  12. }
  13. //释放分支事务加的锁
  14. branchSession.unlock();
  15. //删除分支事务
  16. remove(branchSession);
  17. }
  18. public boolean remove(BranchSession branchSession) {
  19. return branchSessions.remove(branchSession);
  20. }

下面是SessionHelper.endCommitted方法:

  1. public static void endCommitted(GlobalSession globalSession) throws TransactionException {
  2. globalSession.changeStatus(GlobalStatus.Committed);
  3. globalSession.end();
  4. }
  5. //下面是GlobalSession的方法
  6. public void end() throws TransactionException {
  7. // Clean locks first
  8. clean();
  9. for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
  10. lifecycleListener.onEnd(this);
  11. }
  12. }
  13. public void clean() throws TransactionException {
  14. //下面的代码也是释放分支事务的锁,因为removeAllBranches方法已经释放过,所以下面的方法是空执行
  15. LockerManagerFactory.getLockManager().releaseGlobalSessionLock(this);
  16. }

2、回滚全局事务

回滚全局事务的首先也是执行removeAllBranches方法,所以第一步也是释放分支事务加的锁。下面我们重点看一下SessionHelper.endRollbacked:

  1. public static void endRollbacked(GlobalSession globalSession) throws TransactionException {
  2. GlobalStatus currentStatus = globalSession.getStatus();
  3. //isTimeoutGlobalStatus检查当前状态是否是超时的状态
  4. if (isTimeoutGlobalStatus(currentStatus)) {
  5. globalSession.changeStatus(GlobalStatus.TimeoutRollbacked);
  6. } else {
  7. globalSession.changeStatus(GlobalStatus.Rollbacked);
  8. }
  9. globalSession.end();
  10. }
  11. public static boolean isTimeoutGlobalStatus(GlobalStatus status) {
  12. return status == GlobalStatus.TimeoutRollbacked
  13. || status == GlobalStatus.TimeoutRollbackFailed
  14. || status == GlobalStatus.TimeoutRollbacking
  15. || status == GlobalStatus.TimeoutRollbackRetrying;
  16. }

endRollbacked方法首先设置事务的状态,然后调用了globalSession.end方法,globalSession.end方法与提交全局事务中调用的是同一个,都是用于释放分支事务锁的。
endRollbacked方法在修改全局事务状态前,会比较当前事务的状态,如果当前事务状态是超时,则设置事务状态为超时回滚成功,如果不是则设置事务状态为回滚成功。
至于全局事务的状态的流转,则在分析完RM和TM之后在介绍。

3、其他事务状态

如果请求中的事务状态不是前面两种,则根据请求直接修改GlobalSession的状态,之后是一个分支判断,下面重点看一下分支判断:

  1. //事务回滚中、超时回滚重试中、未知状态
  2. if (GlobalStatus.RollbackRetrying.equals(globalStatus)
  3. || GlobalStatus.TimeoutRollbackRetrying.equals(globalStatus)
  4. || GlobalStatus.UnKnown.equals(globalStatus)) {
  5. globalSession.queueToRetryRollback();
  6. LOGGER.info("Global[{}] will retry rollback", globalSession.getXid());
  7. }
  8. //提交重试
  9. else if (GlobalStatus.CommitRetrying.equals(globalStatus)) {
  10. globalSession.queueToRetryCommit();
  11. LOGGER.info("Global[{}] will retry commit", globalSession.getXid());
  12. }

分支判断中分别调用了queueToRetryRollback和queueToRetryCommit。
queueToRetryRollback:

  1. public void queueToRetryRollback() throws TransactionException {
  2. //添加监听器
  3. this.addSessionLifecycleListener(SessionHolder.getRetryRollbackingSessionManager());
  4. //seata提供了回滚重试管理器,下面这行代码将本全局事务对象添加到回滚重试管理器中
  5. //回滚重试管理器后面文章介绍
  6. SessionHolder.getRetryRollbackingSessionManager().addGlobalSession(this);
  7. GlobalStatus currentStatus = this.getStatus();
  8. //设置事务状态
  9. if (SessionHelper.isTimeoutGlobalStatus(currentStatus)) {
  10. this.changeStatus(GlobalStatus.TimeoutRollbackRetrying);
  11. } else {
  12. this.changeStatus(GlobalStatus.RollbackRetrying);
  13. }
  14. }

queueToRetryCommit:

  1. public void queueToRetryCommit() throws TransactionException {
  2. //设置监听器
  3. this.addSessionLifecycleListener(SessionHolder.getRetryCommittingSessionManager());
  4. //将本全局事务对象添加到重试提交管理器中
  5. //重试提交管理器后面文章介绍
  6. SessionHolder.getRetryCommittingSessionManager().addGlobalSession(this);
  7. //设置事务状态
  8. this.changeStatus(GlobalStatus.CommitRetrying);
  9. }

从queueToRetryRollback和queueToRetryCommit两个方法中看出,当事务需要重试提交或者重试回滚时,都将全局事务对象加入到对应管理器中,由管理器进行后续处理。关于管理器,后面的文章详细分析。

4、总结

总的来说,在处理全局事务报告请求时,分了如下几种情况:
(1)事务提交,更改事务状态为已提交,释放分支事务加的数据库记录锁;
(2)事务回滚,释放分支事务加的数据库记录锁,更改事务状态为回滚成功或者超时回滚成功;
(3)事务回滚中、超时回滚重试中、未知状态,更改事务状态,将GlobalSession对象添加到回滚重试管理器;
(4)提交重试,更改事务状态,将GlobalSession对象添加到重试提交管理器;
(5)其他状态,更改事务状态。

相关文章