Seata解析-TC会话管理器SessionManager

x33g5p2x  于2021-12-21 转载在 其他  
字(5.1k)|赞(0)|评价(0)|浏览(547)

本文基于seata 1.3.0版本

在介绍ServerOnRequestProcessor处理器时,多次提到了会话管理器,比如处理分支事务注册请求时,对全局事务对象加锁就是调用了根会话管理器,再如处理全局事务提交请求中,会将全局事务对象添加到异步提交管理器中,这个异步提交管理器就是会话管理器的一种,还有在分析代码的时候,多次看到往globalSession对象中添加监听器,其实这个监听器就是会话管理器。
在seata中,事务也叫作会话,所以会话管理器其实就是事务管理器。因此会话管理器在seata中的作用非常重要,本文就来详细介绍会话管理器。

一、SessionManager

SessionManager是一个接口,它有三个最终实现类,下图是会话管理器的继承结构。

SessionManager继承了接口SessionLifecycleListener,这是一个会话生命周期监听器,会话管理器能够充当监听器,就是该接口在起作用。SessionLifecycleListener接口的方法定义了要监听的事件。

public interface SessionLifecycleListener {

    /** * 监听全局事务的开启,当处理全局事务开启请求时,就会调用该方法 */
    void onBegin(GlobalSession globalSession) throws TransactionException;

    /** * 监听全局事务对象GlobalSession的状态变化,只要是GlobalSession的状态发生变化,就会调用该方法 */
    void onStatusChange(GlobalSession globalSession, GlobalStatus status) throws TransactionException;

    /** * 监听分支事务状态的变化,在处理分支状态报告请求时,会调用该方法 */
    void onBranchStatusChange(GlobalSession globalSession, BranchSession branchSession, BranchStatus status)
        throws TransactionException;

    /** * 监听新的分支事务注册 */
    void onAddBranch(GlobalSession globalSession, BranchSession branchSession) throws TransactionException;

    /** * 监听分支事务从全局事务对象中移除, * 当处理全局事务回滚请求全局事务提交请求时,都会有移除分支事务的动作,因此都会触发该方法 */
    void onRemoveBranch(GlobalSession globalSession, BranchSession branchSession) throws TransactionException;

    /** * 监听全局事务关闭,也就是监听GlobalSession的close方法。 * 在处理全局事务提交请求和全局事务回滚请求时,都会调用GlobalSession的close方法。 */
    void onClose(GlobalSession globalSession) throws TransactionException;

    /** * 监听全局事务终止,也就是监听GlobalSession的end方法。 * 当要求全局事务提交或者回滚时,无论最后成功与否,seata都会调用GlobalSession的end方法,因此都会触发onEnd */
    void onEnd(GlobalSession globalSession) throws TransactionException;
}

我在每个方法上都做了注释,介绍了每个方法都是在哪些场景下会被调用。
SessionManager接口的方法主要是侧重于管理:

public interface SessionManager extends SessionLifecycleListener, Disposable {

    /** * 将全局事务对象添加到会话管理器中,当全局事务异步提交或者异步回滚时,都会调用该方法 */
    void addGlobalSession(GlobalSession session) throws TransactionException;

    /** * 根据XID查找GlobalSession */
    GlobalSession findGlobalSession(String xid) ;

    /** * 不同的存储模式下,本方法和上面的方法实现不同,如果存储模式是file,则两个方法完全一致, * 如果存储模式是db,则上面的方法相当于调用findGlobalSession(xid, true) * 如果第二个参数为true,表示返回的GlobalSession对象中带有分支事务集合 */
    GlobalSession findGlobalSession(String xid, boolean withBranchSessions);

    /** * 更新事务对象的状态 */
    void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException;

    /** * 从管理器中移除GlobalSession * 当异步提交重试超时时,会调用该方法 */
    void removeGlobalSession(GlobalSession session) throws TransactionException;

    /** * 向GlobalSession中添加分支事务对象,当分支事务注册时,会调用该方法 */
    void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException;

    /** * 更新分支事务状态 */
    void updateBranchSessionStatus(BranchSession session, BranchStatus status) throws TransactionException;

    /** * 从全局事务中移除分支事务,当全局事务提交或者回滚时,会调用该方法 */
    void removeBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException;

    /** * 返回所有的全局会话对象 */
    Collection<GlobalSession> allSessions();

    /** * 根据条件查找符合要求的GlobalSession */
    List<GlobalSession> findGlobalSessions(SessionCondition condition);

    /** * 对全局事务对象加锁,当修改全局事务对象的状态时,都会加锁 */
    <T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable)
            throws TransactionException;
}

seata提供了SessionManager管理器的三个实现类:DataBaseSessionManager、FileSessionManager、RedisSessionManager
这三个实现类都继承了抽象类:AbstractSessionManager。

二、AbstractSessionManager

AbstractSessionManager实现了两个接口SessionManager和SessionLifecycleListener。
AbstractSessionManager提供了两个属性:

//事务存储管理器,
 	//TransactionStoreManager有三种实现,分别对应了三种存储模式,
 	//TransactionStoreManager用于将事务日志写入数据库、redis或者文件
 	protected TransactionStoreManager transactionStoreManager;
	//表示管理器的名字,如果管理器的实现类是FileSessionManager,那么name的值是文件名
    protected String name;

下图是AbstractSessionManager的方法:

上面这些方法除了onClose、writeSession、destroy、setTransactionStoreManager四个方法之外,其他的方法都是直接或者间接调用writeSession方法,该方法的作用是写事务日志。从这里可以看出,凡是修改分支事务和全局事务状态,增加或删除分支事务,注册或者删除全局事务,都会写事务日志。
writeSession方法也非常简单,直接调用事务存储管理器的writeSession方法写入日志,如果事务日志写入失败,则抛出对应的异常,在下一篇文章中会在介绍该方法:

private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
        if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
            if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
                throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Fail to store global session");
            } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
                throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Fail to update global session");
            } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
                throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Fail to remove global session");
            } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
                throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Fail to store branch session");
            } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
                throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Fail to update branch session");
            } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
                throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Fail to remove branch session");
            } else {
                throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Unknown LogOperation:" + logOperation.name());
            }
        }
    }

相关文章