Seata解析-DefaultCoordinator的四大定时线程池功能详解

x33g5p2x  于2021-12-21 转载在 其他  
字(4.8k)|赞(0)|评价(0)|浏览(540)

本文基于seata 1.3.0版本

在前面文章介绍的时候,可以看到代码有很多地方将全局事务添加到会话管理器中,比如需要提交的全局事务会添加到重试提交事务管理器,这里面就有一个问题,添加到管理器之后的处理流程是什么?本文就来详细介绍后面的流程。

一、创建并启动线程池

在DefaultCoordinator类中定义了五个ScheduledThreadPoolExecutor属性:

//处理需要重试回滚的事务
    private ScheduledThreadPoolExecutor retryRollbacking = new ScheduledThreadPoolExecutor(1,
        new NamedThreadFactory("RetryRollbacking", 1));
    //处理需要重试提交的事务
    private ScheduledThreadPoolExecutor retryCommitting = new ScheduledThreadPoolExecutor(1,
        new NamedThreadFactory("RetryCommitting", 1));
    //处理需要异步提交的事务
    private ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1,
        new NamedThreadFactory("AsyncCommitting", 1));
    //检测超时事务
    private ScheduledThreadPoolExecutor timeoutCheck = new ScheduledThreadPoolExecutor(1,
        new NamedThreadFactory("TxTimeoutCheck", 1));
    //通知RM删除回滚日志
    private ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1,
        new NamedThreadFactory("UndoLogDelete", 1));

这五个定时线程池就是用于处理加入到事务管理器之后的事务。
它们是在DefaultCoordinator的init方法中启动的:

public void init() {
        //当通知分支事务回滚失败后,将全局事务加入到重试回滚管理器中,
        //下面的线程池用于处理需要回滚的事务
        retryRollbacking.scheduleAtFixedRate(() -> {
            try {
                handleRetryRollbacking();
            } catch (Exception e) {
                LOGGER.info("Exception retry rollbacking ... ", e);
            }
        }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
        //在AT模式下,下面的线程池不会使用
        retryCommitting.scheduleAtFixedRate(() -> {
            try {
                handleRetryCommitting();
            } catch (Exception e) {
                LOGGER.info("Exception retry committing ... ", e);
            }
        }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
        //AT模式下全局事务提交都是异步提交,需要提交的全局事务都加入到异步提交管理器中,
        //由下面的线程池处理
        asyncCommitting.scheduleAtFixedRate(() -> {
            try {
                handleAsyncCommitting();
            } catch (Exception e) {
                LOGGER.info("Exception async committing ... ", e);
            }
        }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
        //遍历全局事务对象,查看是否有超时事务,如果事务超时了,
        //则将全局事务对象加入到重试回滚管理器中,也就是下面交给retryRollbacking线程池处理
        timeoutCheck.scheduleAtFixedRate(() -> {
            try {
                timeoutCheck();
            } catch (Exception e) {
                LOGGER.info("Exception timeout checking ... ", e);
            }
        }, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
        //通知RM删除若干天前的回滚日志,默认是7天,该线程池的线程定时每天运行一次
        undoLogDelete.scheduleAtFixedRate(() -> {
            try {
                undoLogDelete();
            } catch (Exception e) {
                LOGGER.info("Exception undoLog deleting ... ", e);
            }
        }, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
    }

除了undoLogDelete是每天运行一次之外,其他的都是每秒运行一次。

因为内容涉及比较多,本文只对retryRollbacking做一下代码分析,其他的在代码逻辑上类似,只对功能做一下介绍。
我们先来看一下retryRollbacking线程池。

一、retryRollbacking

在retryRollbacking里面线程执行时调用handleRetryRollbacking方法。

protected void handleRetryRollbacking() {
        //从重试回滚会话管理器中获取所有的会话事务
        //这些事务都是通知分支事务回滚失败的
        Collection<GlobalSession> rollbackingSessions = SessionHolder.getRetryRollbackingSessionManager().allSessions();
        if (CollectionUtils.isEmpty(rollbackingSessions)) {
            return;
        }
        long now = System.currentTimeMillis();
        //遍历需要回滚的事务
        for (GlobalSession rollbackingSession : rollbackingSessions) {
            try {
                // prevent repeated rollback
                //首先检查事务状态,如果事务状态为回滚中,且事务从创建到此时为止未超过12s,那么认为事务正在回滚,跳过对该事务的处理
                if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking) && !rollbackingSession.isRollbackingDead()) {
                    continue;
                }
                //检查是否重试超时,如果按照默认值的话,下面这个if永远为false,因为MAX_ROLLBACK_RETRY_TIMEOUT始终是-1
                //可以使用server.maxRollbackRetryTimeout设置MAX_ROLLBACK_RETRY_TIMEOUT的值,必须是正值才有效
                //如果事务从创建到此时的时间超过了MAX_ROLLBACK_RETRY_TIMEOUT,那么认为事务无法回滚了
                if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {
                    //如果多次回滚都未能成功,那么要检查配置是否允许未回退成功的事务解锁
                    //如果允许则解锁
                    if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
                        rollbackingSession.clean();
                    }
                    /** * Prevent thread safety issues */
                    //如果多次回滚都未能成功,那么以后就不再对该事务处理了,从事务管理器中删除该事务
                    SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);
                    LOGGER.info("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());
                    continue;
                }
                rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
                //再次尝试回滚事务
                //doGlobalRollback方法里面要遍历全局事务下的每个分支事务,然后通知分支事务做回滚
                //如果分支事务都可以正常回滚,那么从事务管理器中删除该全局事务,
                //如果不能,那么下次定时线程运行的时候还要重复执行上述逻辑
                core.doGlobalRollback(rollbackingSession, true);
            } catch (TransactionException ex) {
                LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
            }
        }
    }

core.doGlobalRollback方法在文章《Seata解析-TC处理全局事务和分支事务原理详解之全局事务提交请求和全局事务回滚请求》已经介绍过了,这里不再介绍。

handleRetryRollbacking总起来说还是比较简单的,从回滚重试管理器中获得所有需要回滚的时候,然后判断事务已经运行的时间,如果事务长时间未关闭,则认为事务无法完成回滚了,那就从管理器中删除该事务,这种事务可能需要人工手动处理了,否则进入doGlobalRollback方法,该方法会再次通知各个分支事务做回滚。

二、asyncCommitting

本线程池是做异步提交的。
全局事务提交时,不是同步提交的,而是交给异步提交管理器去做。提交使用异步的原因是,分支事务执行无误的情况下,会自动提交,相对来说全局事务的提交已经没有那么重要了,全局事务提交成功与否都不对业务数据造成影响。
本线程首先从异步提交管理器中获得所有的全局事务对象,之后遍历每个全局事务对象的分支事务对象,根据分支事务对象中记录的连接信息,通知RM提交事务,如果所有分支都提交成功,则修改全局事务状态为提交成功。如果分支事务提交失败,则对应的全局事务对象在下次线程启动时重复上述流程。

三、timeoutCheck

本线程池是对根会话管理器中全局事务对象进行检测,如果发现事务处于开始状态且已经超时,则将GlobalSession对象的状态修改为超时回滚中,激活状态为false,然后将其添加到重试回滚会话管理器中。后续由重试回滚会话管理器对事务回滚。

四、undoLogDelete

通知所有的RM删除一段时间以前的回滚日志。默认是删除7天以前的日期。可以通过参数server.undo.logSaveDays配置其他时间。

相关文章