Seata解析-文件会话管理器FileSessionManager详解

x33g5p2x  于2021-12-21 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(479)

本文基于seata 1.3.0版本

在《Seata解析-TC会话管理器SessionManager》中介绍了SessionManager管理器有三个实现类:DataBaseSessionManager、FileSessionManager、RedisSessionManager,可以通过store.mode指定需要使用的管理器实现类,store.mode在file.conf文件中配置,有三个值:db、file、redis,分别对应了上面三个实现类。这三个管理器的区别在于事务日志的写入位置不用,比如DataBaseSessionManager将事务日志写入数据库,FileSessionManager是写入文件。
本文将分析FileSessionManager,另外两个和FileSessionManager类似,以后文章在分析介绍。

一、FileSessionManager

FileSessionManager从名字上可以看出这个管理器和文件相关。
下面先来介绍FileSessionManager如何被创建的。

1、创建

在服务端启动的时候也就是执行Server类的main方法时,会执行下面这行代码:

  1. SessionHolder.init(parameterParser.getStoreMode());

入参parameterParser.getStoreMode()表示从启动命令里面获取设置的存储模式,可以通过“-m”在启动命令里面指定存储模式,和在file.conf文件了设置store.mode是一样的,我们来看一下SessionHolder.init:

  1. public static void init(String mode) throws IOException {
  2. //如果启动命令里面设置为空,那么从file.conf文件里获取
  3. //所以启动命令可以覆盖配置文件
  4. if (StringUtils.isBlank(mode)) {
  5. mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
  6. }
  7. StoreMode storeMode = StoreMode.get(mode);
  8. //下面根据store.mode值的不同,指定不同的分支
  9. if (StoreMode.DB.equals(storeMode)) {
  10. ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
  11. ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
  12. new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
  13. RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
  14. new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
  15. RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
  16. new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
  17. } else if (StoreMode.FILE.equals(storeMode)) {
  18. String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
  19. DEFAULT_SESSION_STORE_FILE_DIR);
  20. if (StringUtils.isBlank(sessionStorePath)) {
  21. throw new StoreException("the {store.file.dir} is empty.");
  22. }
  23. ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
  24. new Object[] {ROOT_SESSION_MANAGER_NAME, sessionStorePath});
  25. ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
  26. new Class[] {String.class, String.class}, new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME, null});
  27. RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
  28. new Class[] {String.class, String.class}, new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME, null});
  29. RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
  30. new Class[] {String.class, String.class}, new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME, null});
  31. } else if (StoreMode.REDIS.equals(storeMode)) {
  32. ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());
  33. ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
  34. StoreMode.REDIS.getName(), new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
  35. RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
  36. StoreMode.REDIS.getName(), new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
  37. RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
  38. StoreMode.REDIS.getName(), new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
  39. } else {
  40. // unknown store
  41. throw new IllegalArgumentException("unknown store mode:" + mode);
  42. }
  43. reload();
  44. }

上面init方法我们只关注file分支,也就是FileSessionManager的创建。

  1. //从file.conf文件中查找属性store.file.dir的值,该属性设置了事务日志的存储目录
  2. //默认目录为sessionStore
  3. String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
  4. DEFAULT_SESSION_STORE_FILE_DIR);
  5. if (StringUtils.isBlank(sessionStorePath)) {
  6. throw new StoreException("the {store.file.dir} is empty.");
  7. }
  8. //创建根会话管理器,ROOT_SESSION_MANAGER_NAME指定了文件名:root.data,该会话管理器的数据会存储到root.data文件中
  9. ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
  10. new Object[] {ROOT_SESSION_MANAGER_NAME, sessionStorePath});
  11. // 异步提交事务管理器,
  12. // 第一个参数ASYNC_COMMITTING_SESSION_MANAGER_NAME设置了文件名,表示该管理器数据存储到该文件中,
  13. // 第二个参数指定了文件的路径,这里设置为null,表示该管理器数据不存储文件,因此第一个参数其实没有用处
  14. //以下两个管理创建使用的参数含义类似。
  15. ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
  16. new Class[] {String.class, String.class}, new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME, null});
  17. //重试提交事务管理器
  18. RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
  19. new Class[] {String.class, String.class}, new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME, null});
  20. //重试回滚事务管理器
  21. RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
  22. new Class[] {String.class, String.class}, new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME, null});

从上面的分析中可以看出,FileSessionManager类名的中的file其实指的是文件root.data,根会话管理器会将事务日志写入该文件中。
上面一共创建了四个会话管理器。关于这四个会话管理器的作用,本文后面介绍。
下面来看一下构造方法:

  1. public FileSessionManager(String name, String sessionStoreFilePath) throws IOException {
  2. //调用父类的构造方法,主要就是设置管理器的名字
  3. super(name);
  4. if (StringUtils.isNotBlank(sessionStoreFilePath)) {
  5. //如果路径不为空的,则创建FileTransactionStoreManager对象
  6. transactionStoreManager = new FileTransactionStoreManager(
  7. sessionStoreFilePath + File.separator + name, this);
  8. } else {
  9. //如果路径是空的,那么事务日志无法写入日志文件,seata提供了一个writeSession空实现
  10. //任何写入都默认是成功的
  11. transactionStoreManager = new AbstractTransactionStoreManager() {
  12. @Override
  13. public boolean writeSession(LogOperation logOperation, SessionStorable session) {
  14. return true;
  15. }
  16. };
  17. }
  18. }

FileSessionManager还提供了一个属性:

  1. private Map<String, GlobalSession> sessionMap = new ConcurrentHashMap<>();

该属性的作用是在内存中记录所有的全局事务,key是事务id,因为事务日志写入了文件,而从文件中获取关于全局事务信息非常影响性能,所以FileSessionManager在将日志写入文件的同时,也会更新sessionMap。当seata重启后,FileSessionManager也会读取日志文件,将事务信息写入sessionMap中。
因为FileSessionManager涉及的方法也比较多,我们从中抽取两个方法看一下:

  1. public void addGlobalSession(GlobalSession session) throws TransactionException {
  2. //调用父类方法将session写入文件中
  3. super.addGlobalSession(session);
  4. //然后更新内存
  5. sessionMap.put(session.getXid(), session);
  6. }
  7. @Override
  8. public GlobalSession findGlobalSession(String xid) {
  9. //查询时直接从内存中读取
  10. return sessionMap.get(xid);
  11. }
  12. public void removeGlobalSession(GlobalSession session) throws TransactionException {
  13. //删除时,先写入文件,然后从内存中删除
  14. super.removeGlobalSession(session);
  15. sessionMap.remove(session.getXid());
  16. }

2、日志写入

在FileSessionManager的构造方法中创建了FileTransactionStoreManager对象,该对象负责将事务日志写入文件。比如FileSessionManager的addGlobalSession方法最终会调用FileTransactionStoreManager的writeSession方法将事务信息写入文件。
因为FileTransactionStoreManager类内容非常多,所以本文只介绍写入流程,源码解析在以后的文章介绍。

seata保留两个日志文件,一个是当前正在写入的文件root.data,另一个是历史文件,历史文件的文件名为root.data.1。每当有新的历史文件生成时,都会将之前的历史文件覆盖。
在上面的流程图中有个问题是为什么日志文件切换时要将超过30分钟未提交的事务重新写入文件?因为事务文件只保留两个,而且日志文件切换时间间隔是30分钟,如果不把超过30分钟的事务重新写入文件一遍,那么日志文件切换时会覆盖之前写入文件的事务信息,这样seata一旦重启,这些超过30分钟的事务信息就无法恢复。

3、重新载入

在第一小节介绍init方法时,在该方法的最后调用了reload方法。该方法用于读取两个日志文件,然后将日志文件里面的未提交或者未回滚的事务信息写入到sessionMap中。这样未结束的事务信息就可以重新恢复,事务信息不会丢失。

4、总结

FileSessionManager的功能分为两个方面:一是在sessionMap中保存所有的事务对象GlobalSession,其保存着GlobalSession的最新数据,二是将GlobalSession的变化情况通过FileTransactionStoreManager写入日志文件中。

二、四种功能会话管理器介绍

在上面第一小节的init方法中可以看到,针对每种存储模式,seata都提供了四种不同功能的会话管理器:根会话管理器,异步提交事务管理器,重试提交事务管理器,重试回滚事务管理器。这四种管理器的使用场景不同。

  • 根会话管理器:所有的事务新增删除、状态修改都要通过该管理器完成,全局所有的事务信息都在该管理器中保存。当全局事务提交或者回滚时,就会从该事务管理器中删除,即使提交或者回滚失败也会从管理器中删除。
  • 异步提交事务管理器:当全局事务可以异步提交时,会将全局事务对象添加到该事务管理器中,后续异步线程访问该管理器,并从中取出所有的事务对象进行后续处理。
  • 重试提交事务管理器:当事务提交失败时,那么该全局事务就可能会被移交给重试提交事务管理器,这样事务的重试提交就由重试提交事务管理器处理。但是在AT模式下,如果事务提交失败会继续由异步提交事务管理器继续重试。
  • 重试回滚事务管理器:当全局事务需要回滚时,如果回滚失败,那么该事务会被加入到重试回滚事务管理器中,有该管理器异步再次通知各个分支事务回滚。

相关文章