Fescar 源码走读(3)之分支事务流程

x33g5p2x  于2021-12-21 转载在 其他  
字(6.1k)|赞(0)|评价(0)|浏览(236)

AsyncWorker目前只实现了branchCommit,用于在分支事务提交后异步删除undo sql记录,目前branchrollback接口还没有实现

private void doBranchCommits() {
    if (ASYNC_COMMIT_BUFFER.size() == 0) {
        return;
    }
    Map<String, List<Phase2Conjava>> mappedConjavas = new HashMap<>();
    Iterator<Phase2Conjava> iterator = ASYNC_COMMIT_BUFFER.iterator();
    while (iterator.hasNext()) {
        Phase2Conjava commitConjava = iterator.next();
        List<Phase2Conjava> conjavasGroupedByResourceId = mappedConjavas.get(commitConjava.resourceId);
        if (conjavasGroupedByResourceId == null) {
            conjavasGroupedByResourceId = new ArrayList<>();
            mappedConjavas.put(commitConjava.resourceId, conjavasGroupedByResourceId);
        }
        conjavasGroupedByResourceId.add(commitConjava);

        iterator.remove();

    }

    for (String resourceId : mappedConjavas.keySet()) {
        Connection conn = null;
        try {
            try {
                DataSourceProxy dataSourceProxy = DataSourceManager.get().get(resourceId);
                conn = dataSourceProxy.getPlainConnection();
            } catch (SQLException sqle) {
                LOGGER.warn("Failed to get connection for async committing on " + resourceId, sqle);
                continue;
            }

            List<Phase2Conjava> conjavasGroupedByResourceId = mappedConjavas.get(resourceId);
            for (Phase2Conjava commitConjava : conjavasGroupedByResourceId) {
                try {
                    UndoLogManager.deleteUndoLog(commitConjava.xid, commitConjava.branchId, conn);
                } catch (Exception ex) {
                    LOGGER.warn("Failed to delete undo log [" + commitConjava.branchId + "/" + commitConjava.xid + "]", ex);
                }
            }

        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
                }
            }
        }

    }

}

ConnectionProxy.commit在提交是会调用branchRegiester。在如下方法register()里 

public void commit() throws SQLException {
    if (conjava.inGlobalTransaction()) {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e);
        }

        try {
            if (conjava.hasUndoLog()) {
                UndoLogManager.flushUndoLogs(this);
            }
            targetConnection.commit();
        } catch (Throwable ex) {
            report(false);
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                throw new SQLException(ex);
            }

        }
        report(true);
        conjava.reset();

    } else {
        targetConnection.commit();
    }
}

DataSourceManager是rm的实现:

branchRegister通过rpcclient到服务端注册分支事务

DefaultCoordinator来注册分支事务

@Override
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcConjava rpcConjava) throws TransactionException {
    response.setTransactionId(request.getTransactionId());
    response.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcConjava.getClientId(),
            XID.generateXID(request.getTransactionId()), request.getLockKey()));

}

DefaultCore.branchRegister 

public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException {
    GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);

    BranchSession branchSession = new BranchSession();
    branchSession.setTransactionId(XID.getTransactionId(xid));
    branchSession.setBranchId(UUIDGenerator.generateUUID());
    branchSession.setApplicationId(globalSession.getApplicationId());
    branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());
    branchSession.setBranchType(branchType);
    branchSession.setResourceId(resourceId);
    branchSession.setLockKey(lockKeys);
    branchSession.setClientId(clientId);

    if (!branchSession.lock()) {
        throw new TransactionException(LockKeyConflict);
    }
    try {
        globalSession.addBranch(branchSession);
    } catch (RuntimeException ex) {
        throw new TransactionException(FailedToAddBranch);

    }
    return branchSession.getBranchId();
}

分支branch注册到全局事务分支上

commit过程:

第一阶段提交以executeUpdate为例,入口地址如下:

public int executeUpdate() throws SQLException {
    return ExecuteTemplate.execute(this, new StatementCallback<Integer, PreparedStatement>() {
        @Override
        public Integer execute(PreparedStatement statement, Object... args) throws SQLException {
            return statement.executeUpdate();
        }
    });
}

ExecuteTemplate.execute根据不同的语句生成不同的执行器 

public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                 StatementProxy<S> statementProxy,
                                                 StatementCallback<T, S> statementCallback,
                                                 Object... args) throws SQLException {

    if (!RootConjava.inGlobalTransaction()) {
        // Just work as original statement
        return statementCallback.execute(statementProxy.getTargetStatement(), args);
    }

    if (sqlRecognizer == null) {
        sqlRecognizer = SQLVisitorFactory.get(
                statementProxy.getTargetSQL(),
                statementProxy.getConnectionProxy().getDbType());
    }
    Executor<T> executor = null;
    switch (sqlRecognizer.getSQLType()) {
        case INSERT:
            executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
            break;
        case UPDATE:
            executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
            break;
        case DELETE:
            executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
            break;
        case SELECT_FOR_UPDATE:
            executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
            break;
        default:
            executor = new PlainExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
            break;
    }
    T rs = null;
    try {
        rs = executor.execute(args);

    } catch (Throwable ex) {
        if (ex instanceof SQLException) {
            throw (SQLException) ex;
        } else {
            // Turn everything into SQLException
            new SQLException(ex);
        }
    }
    return rs;
}

以UpdateExecutor为例,UpdateExecutor.execute通过模板类会调用如下方法: 

public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    if (connectionProxy.getAutoCommit()) {
        return executeAutoCommitTrue(args);
    } else {
        return executeAutoCommitFalse(args);
    }
}

具体实现会保存更改前后的数据镜像并插入到undo log里并commit。只有回滚用undo log里的数据生成sql语句回滚 

protected T executeAutoCommitFalse(Object[] args) throws Throwable {
    TableRecords beforeImage = beforeImage();
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    TableRecords afterImage = afterImage(beforeImage);
    statementProxy.getConnectionProxy().prepareUndoLog(sqlRecognizer.getSQLType(), sqlRecognizer.getTableName(), beforeImage, afterImage);
    return result;
}

回滚在上一篇已经说了,就不多说了

由于AT事务在第一阶段已提交,所以commit过程是由Asyncworker异步删除undo log,真正的commit是在第一阶段完成的

public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    return asyncWorker.branchCommit(xid, branchId, resourceId, applicationData);
}

TableMetaCache:

这个类里存储了缓存的TableMeta(一个小问题,如果tablemeta发生变更如果应用服务没有重启的话,默认15分钟缓存才过期。这种情况应该很少,正常情况下如果tablemeta有变更的话,相应的业务应用应该都有代码变更需要重新部署,或者考虑数据库更改对应的业务兼容性的问题)

相关文章