Seata解析-详解RM及TM的初始化过程

x33g5p2x  于2021-12-21 转载在 其他  
字(10.2k)|赞(0)|评价(0)|浏览(824)

本文基于seata 1.3.0版本

虽然RM和TM功能不相同,但是两者都部署在应用端,和应用程序部署在一起,RM和TM的初始化都是由同一位置触发的。
本文将详细介绍RM和TM的初始化过程。

一、GlobalTransactionScanner对象的创建

RM、TM的初始化都是由类GlobalTransactionScanner触发的,那么先来介绍GlobalTransactionScanner对象是如何创建的。
创建GlobalTransactionScanner对象时,需要设置两个参数:

  1. spring.application.name:应用名
  2. spring.cloud.alibaba.seata.tx-service-group:事务分组

1、自己编程创建

我们可以通过自己编程创建GlobalTransactionScanner对象。代码如下:

  1. @Bean
  2. public GlobalTransactionScanner globalTransactionScanner() {
  3. //应用名
  4. String applicationName = applicationContext.getEnvironment()
  5. .getProperty("spring.application.name");
  6. //事务分组
  7. String txServiceGroup = applicationContext.getEnvironment()
  8. .getProperty("spring.cloud.alibaba.seata.tx-service-group");
  9. return new GlobalTransactionScanner(applicationName, txServiceGroup);
  10. }

2、使用GlobalTransactionAutoConfiguration

GlobalTransactionAutoConfiguration是由spring-cloud-alibaba-seata提供的,需要在pom.xml文件中引入spring-cloud-alibaba-seata。
GlobalTransactionAutoConfiguration是一个自动配置类,因此在spring boot启动的时候便可以加载,并创建需要的bean。
其创建GlobalTransactionScanner的方法如下,与上一小节的方法代码类似:

  1. @Bean
  2. public GlobalTransactionScanner globalTransactionScanner() {
  3. String applicationName = applicationContext.getEnvironment()
  4. .getProperty("spring.application.name");
  5. String txServiceGroup = seataProperties.getTxServiceGroup();
  6. //如果没有配置事务分组,
  7. //则使用应用名+-fescar-service-group作为事务分组
  8. if (StringUtils.isEmpty(txServiceGroup)) {
  9. txServiceGroup = applicationName + "-fescar-service-group";
  10. seataProperties.setTxServiceGroup(txServiceGroup);
  11. }
  12. return new GlobalTransactionScanner(applicationName, txServiceGroup);
  13. }

3、使用SeataAutoConfiguration

SeataAutoConfiguration是由seata-spring-boot-starter提供的,该类也是一个自动配置类。在类中提供了方法globalTransactionScanner()来创建GlobalTransactionScanner对象,该方法的代码与GlobalTransactionAutoConfiguration类似,不再介绍。

二、GlobalTransactionScanner初始化

GlobalTransactionScanner实现了InitializingBean接口,所以在GlobalTransactionScanner对象创建完毕之后,spring容器执行afterPropertiesSet()方法。
afterPropertiesSet方法主要是初始化TM和RM。

  1. public void afterPropertiesSet() {
  2. //disableGlobalTransaction表示是否启用全局事务,
  3. //如果设置false,则seata的分布式事务不起作用
  4. //可以通过transport.enableClientBatchSendRequest设置
  5. if (disableGlobalTransaction) {
  6. if (LOGGER.isInfoEnabled()) {
  7. LOGGER.info("Global transaction is disabled.");
  8. }
  9. return;
  10. }
  11. //初始化
  12. initClient();
  13. }
  14. private void initClient() {
  15. if (LOGGER.isInfoEnabled()) {
  16. LOGGER.info("Initializing Global Transaction Clients ... ");
  17. }
  18. if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
  19. throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
  20. }
  21. //init TM
  22. //TM初始化
  23. //TM的初始化主要完成以下几件事:
  24. //1、创建连接池
  25. //2、创建客户端Netty,并启动
  26. //3、创建并启动用于检测的线程池
  27. TMClient.init(applicationId, txServiceGroup);
  28. if (LOGGER.isInfoEnabled()) {
  29. LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
  30. }
  31. //init RM
  32. //RM初始化,其初始化过程与TM类似
  33. RMClient.init(applicationId, txServiceGroup);
  34. if (LOGGER.isInfoEnabled()) {
  35. LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
  36. }
  37. if (LOGGER.isInfoEnabled()) {
  38. LOGGER.info("Global Transaction Clients are initialized. ");
  39. }
  40. //注册关闭回调钩子
  41. registerSpringShutdownHook();
  42. }

从initClient方法中可以看到,其主要做了TM和RM的初始化。RM的初始化与TM的初始化动作类似,下面重点分析TM。

1、TM初始化

TM的初始化主要完成以下三件事:

  1. 创建连接池
  2. 创建客户端Netty,并启动
  3. 创建并启动用于检测的线程池

下面从代码上来分析一下如何实现的。
TMClient的init方法如下:

  1. public static void init(String applicationId, String transactionServiceGroup) {
  2. TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
  3. tmNettyRemotingClient.init();
  4. }

init()方法首先获取TmNettyRemotingClient实例,然后调用其init()方法。下面来看一下TmNettyRemotingClient.getInstance()方法。

  1. public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
  2. TmNettyRemotingClient tmNettyRemotingClient = getInstance();
  3. //设置应用名
  4. tmNettyRemotingClient.setApplicationId(applicationId);
  5. //设置服务分组
  6. tmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
  7. return tmNettyRemotingClient;
  8. }
  9. public static TmNettyRemotingClient getInstance() {
  10. if (instance == null) {
  11. synchronized (TmNettyRemotingClient.class) {
  12. if (instance == null) {
  13. //Netty配置对象,针对Netty的大部分配置信息都在该类中
  14. NettyClientConfig nettyClientConfig = new NettyClientConfig();
  15. final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
  16. nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
  17. KEEP_ALIVE_TIME, TimeUnit.SECONDS,
  18. new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
  19. new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
  20. nettyClientConfig.getClientWorkerThreads()),
  21. RejectedPolicies.runsOldestTaskPolicy());
  22. //创建TmNettyRemotingClient对象
  23. instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
  24. }
  25. }
  26. }
  27. return instance;
  28. }
  29. //TmNettyRemotingClient父类AbstractNettyRemotingClient的构造方法如下:
  30. public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
  31. ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
  32. super(messageExecutor);
  33. this.transactionRole = transactionRole;
  34. //NettyClientBootstrap的作用是管理Netty客户端,启动Netty、关闭Netty、创建新的链接
  35. clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
  36. //设置Netty的责任链
  37. clientBootstrap.setChannelHandlers(new ClientHandler());
  38. //clientChannelManager是一个连接池管理器,管理与服务端的连接,当需要连接时,直接从连接池中获取
  39. //如果连接在连接池中不存在,则通过clientBootstrap获得新连接
  40. clientChannelManager = new NettyClientChannelManager(
  41. new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
  42. }

TmNettyRemotingClient对象实例化时,主要是调用父类的构造方法,也就是AbstractNettyRemotingClient的构造方法。在AbstractNettyRemotingClient构造方法中,创建了Netty客户端启动器,设置了Netty的处理器,创建了连接池管理器。这里都只是创建了对象,比如Netty客户端启动器只是创建了对象,并没有启动Netty客户端,连接池管理器也是一样,连接池管理器中并没有连接。
到这里TmNettyRemotingClient实例创建完毕,下面来看一下TmNettyRemotingClient的init方法。

  1. public void init() {
  2. //注册处理器
  3. registerProcessor();
  4. if (initialized.compareAndSet(false, true)) {
  5. //启动客户端Netty,启动检查连接是否可用的定时任务
  6. //代码见下方
  7. super.init();
  8. }
  9. }
  10. private void registerProcessor() {
  11. // 1.registry TC response processor
  12. //注册处理器
  13. //除了心跳报文外,其他的消息都是由ClientOnResponseProcessor处理的
  14. //ClientOnResponseProcessor用于处理响应消息,Netty会将响应消息交给本处理器处理
  15. ClientOnResponseProcessor onResponseProcessor =
  16. new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
  17. super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
  18. super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
  19. super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
  20. super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
  21. super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
  22. super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
  23. super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
  24. // 2.registry heartbeat message processor
  25. ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
  26. super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
  27. }
  28. //下面代码是TmNettyRemotingClient父类AbstractNettyRemotingClient的init()方法
  29. public void init() {
  30. //启动检测连接是否可用的定时任务
  31. //每过10s定时任务启动一次检查与TC的连接是否可用,如果发现不可用,则自动重连
  32. timerExecutor.scheduleAtFixedRate(new Runnable() {
  33. @Override
  34. public void run() {
  35. clientChannelManager.reconnect(getTransactionServiceGroup());
  36. }
  37. }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
  38. //判断是否允许客户端批量发送请求消息,默认是true
  39. if (NettyClientConfig.isEnableClientBatchSendRequest()) {
  40. //如果允许,则启动一个线程池,该线程池循环检查是否有可发送的消息,如果有,则收集所有发送到同一机器的消息,
  41. //然后将这些消息打包成一个对象MergedWarpMessage,之后将该对象发送到服务端。
  42. mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
  43. MAX_MERGE_SEND_THREAD,
  44. KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
  45. new LinkedBlockingQueue<>(),
  46. new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
  47. mergeSendExecutorService.submit(new MergedSendRunnable());
  48. }
  49. //调用父类的init方法,在父类方法中启动了一个定时任务,用于清理超时请求
  50. //客户端发送每个请求都会记录到集合futures中,等待服务端返回后再从集合中删除
  51. //如果服务端超时,只能通过定时任务删除了
  52. super.init();
  53. //启动客户端Netty,下面方法执行完成,表示客户端可以建立与服务端的连接了
  54. clientBootstrap.start();
  55. }

TmNettyRemotingClient的init方法主要完成下面四项工作:

  1. 启动检测连接是否可用的定时任务;
  2. 启动批量发送请求消息的线程池;
  3. 启动定时任务,清理超时请求
  4. 启动客户端Netty,将ClientHandler设置到责任链中。

2、RM初始化

RM的初始化与TM基本类似,所不同的是,RMClient.init方法中创建的对象是RmNettyRemotingClient。

  1. public static void init(String applicationId, String transactionServiceGroup) {
  2. RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
  3. //设置默认资源管理器
  4. rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
  5. //设置事务处理器
  6. rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
  7. rmNettyRemotingClient.init();
  8. }

因为RmNettyRemotingClient和TmNettyRemotingClient都继承相同的父类,创建对象都是调用父类的构造方法,所以对象创建流程都是一样的。
还有一点不同的是,RmNettyRemotingClient注册的处理器不同。

  1. private void registerProcessor() {
  2. // 1.registry rm client handle branch commit processor
  3. RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
  4. super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
  5. // 2.registry rm client handle branch commit processor
  6. RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
  7. super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
  8. // 3.registry rm handler undo log processor
  9. RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
  10. super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
  11. // 4.registry TC response processor
  12. ClientOnResponseProcessor onResponseProcessor =
  13. new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
  14. super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
  15. super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
  16. super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
  17. super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
  18. super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
  19. // 5.registry heartbeat message processor
  20. ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
  21. super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
  22. }

相关文章