文章23 | 阅读 17176 | 点赞0
本文基于seata 1.3.0版本
本文继续介绍TC对请求的处理。
本文将分析全局事务提交请求和全局事务回滚请求的处理逻辑。
全局事务提交请求的消息类型是MessageType.TYPE_GLOBAL_COMMIT,请求对象为GlobalCommitRequest,该请求由TM发起。该请求的作用是报告某个全局事务执行完毕请求提交。TC收到后要做出以下更改:
这里简单介绍一下异步提交管理器后续对全局事务做的处理是:通知各个分支事务,要求分支事务提交,如果分支事务提交都成功了,则修改全局事务状态为提交成功,如果有失败的,则重试通知。异步提交管理器后面有文章专门做介绍。
从上面的描述可以看出,全局事务提交请求主要是释放分支事务锁,通知分支事务提交,最后修改全局事务状态。
下面来看一下代码逻辑。下面直接从DefaultCore的commit方法开始介绍。
public GlobalStatus commit(String xid) throws TransactionException {
//根据XID找到全局事务对象GlobalSession
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
//如果GlobalSession没有找到,说明当前事务是非seata管理的
return GlobalStatus.Finished;
}
//添加监听器
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
//shouldCommit表示当前事务是否可以提交,如果事务状态为GlobalStatus.Begin,则可以提交,否则不可以
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
// the lock should release after branch commit
// Highlight: Firstly, close the session, then no more branch can be registered.
//closeAndClean:将关闭消息通知上面注册的监听器,然后释放分支事务锁
globalSession.closeAndClean();
//如果事务状态为开始,那么将状态直接改为提交中
if (globalSession.getStatus() == GlobalStatus.Begin) {
globalSession.changeStatus(GlobalStatus.Committing);
return true;
}
return false;
});
if (!shouldCommit) {
//返回当前事务状态,表示禁止全局事务提交,事务提交失败
return globalSession.getStatus();
}
//遍历分支事务是否可以异步提交,如果分支事务有TCC或者XA的,则不能异步提交
//本文介绍的场景都是AT模式的,因此globalSession.canBeCommittedAsync()返回true
//下面只介绍if分支
if (globalSession.canBeCommittedAsync()) {
//执行异步提交
globalSession.asyncCommit();
//返回事务状态为提交成功
return GlobalStatus.Committed;
} else {
//执行同步提交
doGlobalCommit(globalSession, false);
}
//返回最终事务状态
return globalSession.getStatus();
}
上面代码使用到了globalSession.asyncCommit方法,下面来看一下这个方法:
public void asyncCommit() throws TransactionException {
//添加监听器
this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());
//将当前全局事务添加到异步提交管理器,异步提交管理器后续文章介绍
SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);
//修改事务状态为异步提交中
this.changeStatus(GlobalStatus.AsyncCommitting);
}
从asyncCommit方法可以看出在AT模式下,通知分支事务提交委托给异步提交管理器去完成了。
不知道大家有没有一个疑问,为什么不先通知分支事务提交,之后根据提交结果再释放分支事务锁?
这里我理解是,既然TM通知全局事务提交了,那么说明当前事务的执行过程是成功的,没有错误,而且分支事务在执行结束后就已经提交了,也就是在全局事务提交前,分支事务对数据的改变就已经写入数据库了,TC通知分支事务提交是为了处理回滚日志等,这些处理与业务处理关系不大,即使分支事务提交失败也不会有影响,但是分支事务加的锁就不同了,如果锁一直加着,就会影响其他事务的执行,严重可能造成大量事务执行失败,所以先释放锁,让其他需要锁的事务可以正常执行,至于分支事务提交可以异步进行,即使失败也没有影响。
全局事务回滚请求的消息类型是MessageType.TYPE_GLOBAL_ROLLBACK,请求对象为GlobalRollbackRequest,该请求由TM发起。该请求的作用是回滚全局事务。TC收到后要做出以下更改:
对于上述流程,有个问题就是对于回滚失败的分支事务怎么处理,因为一个全局事务包含多个分支事务,可能有的回滚成功了,有的回滚失败,这样会造成事务的不一致,这个问题等到介绍TM的时候在分析。
下面看一下具体的代码实现,从DefaultCore的rollback方法看起:
public GlobalStatus rollback(String xid) throws TransactionException {
//获得XID对应的GlobalSession
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
//如果没有找到,说明当前事务不是seat管理的
return GlobalStatus.Finished;
}
//添加监听器
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
//lockAndExecute方法会对全局事务对象GlobalSession加锁,然后执行里面的回调方法,
//回调成功后,再解锁
boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
//close方法是将关闭事件通知给之前注册的监听器
globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
if (globalSession.getStatus() == GlobalStatus.Begin) {
//修改状态为回滚中
globalSession.changeStatus(GlobalStatus.Rollbacking);
return true;
}
return false;
});
if (!shouldRollBack) {
return globalSession.getStatus();
}
doGlobalRollback(globalSession, false);
return globalSession.getStatus();
}
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start rollback event
//发布事务回滚中的事件
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getBeginTime(), null, globalSession.getStatus()));
//当前使用的模式是AT模式,因此只执行else分支
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
} else {
//遍历全局事务下的每个分支事务
for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
BranchStatus currentBranchStatus = branchSession.getStatus();
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
//如果分支事务在一阶段失败了,说明事务更改没有写入数据库,
//则分支事务无需做操作,将该分支事务从全局事务中删除即可
globalSession.removeBranch(branchSession);
continue;
}
try {
//branchRollback:构建BranchRollbackRequest请求对象,通知分支事务做回滚
BranchStatus branchStatus = branchRollback(globalSession, branchSession);
switch (branchStatus) {
case PhaseTwo_Rollbacked:
//如果二阶段回退成功,则直接将分支事务从全局事务中删除
globalSession.removeBranch(branchSession);
LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
continue;
case PhaseTwo_RollbackFailed_Unretryable:
//事务回滚失败,则修改全局事务状态为回滚失败,并且释放所有的分支事务锁
//然后给TM返回回滚失败
SessionHelper.endRollbackFailed(globalSession);
LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
return false;
default:
LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
if (!retrying) {
//将全局事务加入到回滚重试管理器
globalSession.queueToRetryRollback();
}
return false;
}
} catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex,
"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",
new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
if (!retrying) {
//如果异常了,则将全局事务加入到回滚重试管理器
globalSession.queueToRetryRollback();
}
throw new TransactionException(ex);
}
}
// In db mode, there is a problem of inconsistent data in multiple copies, resulting in new branch
// transaction registration when rolling back.
// 1. New branch transaction and rollback branch transaction have no data association
// 2. New branch transaction has data association with rollback branch transaction
// The second query can solve the first problem, and if it is the second problem, it may cause a rollback
// failure due to data changes.
//能进行到这个位置,说明分支事务全部回滚成功了,GlobalSession中已经没有分支事务了
//根据注释,增加下面的代码应该是在db模式下出过问题,但是新分支注册会对全局事务对象加锁,回滚也会加锁,
//所以回滚时,不会发生新的分支事务注册情况,对于上面的注释大家如果知道原因烦请指导
GlobalSession globalSessionTwice = SessionHolder.findGlobalSession(globalSession.getXid());
if (globalSessionTwice != null && globalSessionTwice.hasBranch()) {
LOGGER.info("Rollbacking global transaction is NOT done, xid = {}.", globalSession.getXid());
return false;
}
}
//success永远都是true
if (success) {
//endRollbacked更改全局事务状态为回滚成功,并且释放分支事务锁
SessionHelper.endRollbacked(globalSession);
// rollbacked event
//发布回滚成功事件
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getBeginTime(), System.currentTimeMillis(),
globalSession.getStatus()));
LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
}
return success;
}
全局事务回滚请求的代码逻辑和全局事务提交请求的逻辑比较类似。
全局事务提交请求和全局事务回滚请求的代码里面都在GlobalSession对象中添加了监听器,然后在执行下面这行代码时,会通知监听器:
globalSession.close();
//globalSession的close方法
public void close() throws TransactionException {
if (active) {
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onClose(this);
}
}
}
下面我们看一下监听器的处理逻辑:
public void onClose(GlobalSession globalSession) throws TransactionException {
globalSession.setActive(false);
}
监听器的处理还是非常简单的,只是将globalSession对象有激活状态改为非激活状态,改为非激活状态后,新分支事务就不能在注册了。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_38308374/article/details/108512780
内容来源于网络,如有侵权,请联系作者删除!