Seata解析-文件事务存储管理器FileTransactionStoreManager详解

x33g5p2x  于2021-12-21 转载在 其他  
字(4.0k)|赞(0)|评价(0)|浏览(573)

本文基于seata 1.3.0版本

本文将详解FileTransactionStoreManager,该类是由FileSessionManager创建并初始化,也是由FileSessionManager调度使用,作用是将对GlobalSession和BranchSession的所有操作都记录到日志文件中。
FileTransactionStoreManager继承的类和实现的接口相对比较简单,本文不再介绍。
本文从日志文件的结构说起,后面介绍源代码。

一、文件结构

GlobalSession存储到文件的内容,分为两种,一种是固定存储,另一种是非固定存储。固定存储是必须要存的,非固定存储的字段如果其长度为0,就不会存储。

上面这个图是GlobalSession在文件中存储时的内容结构。其中前三个字段和最后三个字段是固定的,无论哪种GlobalSession都必须存储这六个字段。
后面紧跟的是应用名长度,占2个字节,表示的是后面应用名字段的长度,如果应用名长度值是0,表示应用名为null,那么应用名长度字段后面紧跟的就是事务分组长度字段了。后面的事务分组、事务名、XID、applicationData都是一样的规则。最后三个字段记录的是事务开始时间和事务状态和对事务对象施加的操作(新增、删除、修改)。
下图是BranchSession的文件结构:

其字段规则与GlobalSession类似。

二、FileTransactionStoreManager创建

下面看一下FileTransactionStoreManager的构造方法:

public FileTransactionStoreManager(String fullFileName, SessionManager sessionManager) throws IOException {
        //根据文件路径创建事务日志的文件对象
        //fullFileName是文件的路径
        initFile(fullFileName);
        //fileWriteExecutor是异步线程池,用于执行请求命令,比如异步刷盘
        fileWriteExecutor = new ThreadPoolExecutor(MAX_THREAD_WRITE, MAX_THREAD_WRITE, Integer.MAX_VALUE,
            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
            new NamedThreadFactory("fileTransactionStore", MAX_THREAD_WRITE, true));
        // WriteDataFileRunnable类中有一个阻塞队列storeRequests,
        //用于存储请求命令StoreRequest,在run方法中执行这些命令
        // 该类使用了命令设计模式
        writeDataFileRunnable = new WriteDataFileRunnable();
        fileWriteExecutor.submit(writeDataFileRunnable);
        this.sessionManager = sessionManager;
    }
    private void initFile(String fullFileName) throws IOException {
        //fullFileName是文件的全路径,文件名是root.data
        this.currFullFileName = fullFileName;
        //hisFullFileName表示历史日志文件,文件名是root.data.1
        this.hisFullFileName = fullFileName + HIS_DATA_FILENAME_POSTFIX;
        try {
            currDataFile = new File(currFullFileName);
            if (!currDataFile.exists()) {
                // create parent dir first
                if (currDataFile.getParentFile() != null && !currDataFile.getParentFile().exists()) {
                    currDataFile.getParentFile().mkdirs();
                }
                currDataFile.createNewFile();
                trxStartTimeMills = System.currentTimeMillis();
            } else {
                trxStartTimeMills = currDataFile.lastModified();
            }
            lastModifiedTime = System.currentTimeMillis();
            currRaf = new RandomAccessFile(currDataFile, "rw");
            //将文件的指针指向文件的最后,日志写入从文件的最后开始
            currRaf.seek(currDataFile.length());
            //获取文件channel,数据都是通过channel对象写入文件的
            currFileChannel = currRaf.getChannel();
        } catch (IOException exx) {
            LOGGER.error("init file error,{}", exx.getMessage(), exx);
            throw exx;
        }
    }

构造方法中会创建事务日志的文件对象,获得文件的channel对象,channel对象用于数据写入,然后创建一个线程池,用于执行一些可以异步处理的命令。

三、事务写入文件

每次对事务对象操作的时候,都会调用writeSession方法写入日志文件。
writeSession方法先将GlobalSession或者BranchSession两个对象编码为字节数组,然后将字节数组先写入缓存,之后在从缓存一次性写入文件。这样先写缓存再写文件是因为效率缘故,这里的缓存使用的是直接内存,相对于字节数组直接写入文件,直接内存写入文件,操作系统可以减少一次系统复制,效率大大提高。
之后检查日志文件是否要切换为新文件,如果要切换为新的文件,则将当前文件名字改为root.data.1,如果之前已经有root.data.1了,则直接覆盖,然后创建一个空的root.data文件,以后日志写入这个新创建的日志文件。
最后则是强制将文件刷新至磁盘,防止文件内容丢失。

/** * 日志的写入都是调用该方法 * logOperation表示当前对事务对象做的操作,操作一共有6中,可以参见LogOperation类 * session表示事务对象,可以是GlobalSession也可以是BranchSession */
    public boolean writeSession(LogOperation logOperation, SessionStorable session) {
        //加锁,日志文件写入是单线程写入
        writeSessionLock.lock();
        long curFileTrxNum;
        try {
            //下面的encode方法将事务对象和操作编码为byte数组,日志文件的格式都在encode()里面
            //encode()方法不再做介绍,其返回的字节数组格式与上面第一节介绍的格式是一致的
            //writeDataFile方法将byte数组先写入缓存,待数组内容全部写入后,在从缓存一次性全部写入文件
            if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {
            	//如果写入失败,返回false,调用方会根据结果抛出异常
                return false;
            }
            //记录最后的更新时间
            lastModifiedTime = System.currentTimeMillis();
            //FILE_TRX_NUM表示当前已经写入了多少个事务
            curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
            //下面这个判断表示当事务日志每写入PER_FILE_BLOCK_SIZE个并且
            //使用当前日志文件的时间超过MAX_TRX_TIMEOUT_MILLS后,seata会自动将日志文件切换为一个新的
            //MAX_TRX_TIMEOUT_MILLS=30分钟
            if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0
                && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {
                //保存30分钟未结束的事务,并将日志文件切换为新的文件
                return saveHistory();
            }
        } catch (Exception exx) {
            LOGGER.error("writeSession error, {}", exx.getMessage(), exx);
            return false;
        } finally {
            //解锁
            writeSessionLock.unlock();
        }
        //强制将日志刷新到磁盘
        flushDisk(curFileTrxNum, currFileChannel);
        return true;
    }

四、文件重新载入

seata启动时,都会重新读取日志文件:root.data.1和root.data。将日志文件中未结束的事务保存到FileSessionManager的sessionMap中,这样可以将事务数据恢复到系统崩溃前的状态。
两个日志文件解析后将事务数据全部读入sessionMap中,之后会再遍历sessionMap中的数据,如果事务已经结束或者提交失败,回滚失败等,这些任务都会从sessionMap中删除,最终留下来的都是未结束的事务对象。

相关文章