Seata解析-TC处理全局事务和分支事务原理详解之分支事务提交结果响应和分支事务回滚结果响应

x33g5p2x  于2021-12-21 转载在 其他  
字(2.7k)|赞(0)|评价(0)|浏览(552)

本文基于seata 1.3.0版本

在《Seata解析-TC处理全局事务和分支事务原理详解之全局事务提交请求和全局事务回滚请求》提到,TC会给各个分支事务发送消息通知分支事务提交(提交是在异步提交管理器中进行的)或者回滚,那这里就涉及到分支事务的响应问题,本文将分析分支事务的两个响应:分支事务提交结果响应和分支事务回滚结果响应。
这两个响应都是由ServerOnResponseProcessor处理器处理的。下面看一下该类的process方法:

  1. public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  2. MessageFuture messageFuture = futures.remove(rpcMessage.getId());
  3. //发送的消息都会创建MessageFuture对象并存储到futures中,
  4. //这样当响应消息到达时,可以通过futures找到MessageFuture对象
  5. //当执行messageFuture.setResultMessage设置返回值,则会触发发送请求的线程运行
  6. //MessageFuture下面会介绍
  7. if (messageFuture != null) {
  8. //本文涉及的两个响应都会进入到该分支中,除非响应超时
  9. messageFuture.setResultMessage(rpcMessage.getBody());
  10. } else {
  11. if (ChannelManager.isRegistered(ctx.channel())) {
  12. //onResponseMessage仅仅打印了日志,比较简单,不再做介绍
  13. onResponseMessage(ctx, rpcMessage);
  14. } else {
  15. //如果当前channel没有注册过,seata任务是非法连接,直接关闭该连接。
  16. try {
  17. if (LOGGER.isInfoEnabled()) {
  18. LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
  19. }
  20. ctx.disconnect();
  21. ctx.close();
  22. } catch (Exception exx) {
  23. LOGGER.error(exx.getMessage());
  24. }
  25. if (LOGGER.isInfoEnabled()) {
  26. LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
  27. }
  28. }
  29. }
  30. }

当给分支事务发送提交或者回滚请求时,都会在futures属性中添加MessageFuture对象,下面是该对象包含的属性:

  1. private RpcMessage requestMessage;//请求报文
  2. private long timeout;//等待响应的超时时间
  3. private long start = System.currentTimeMillis();//请求的发送时间
  4. private transient CompletableFuture<Object> origin = new CompletableFuture<>();

这些属性中最重要的是origin属性,文章下面会看到该属性如何使用。
下面再来看一下请求发送逻辑(删除了部分代码):

  1. protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
  2. //创建MessageFuture对象
  3. MessageFuture messageFuture = new MessageFuture();
  4. //rpcMessage是要发送的请求对象
  5. messageFuture.setRequestMessage(rpcMessage);
  6. //设置超时时间
  7. messageFuture.setTimeout(timeoutMillis);
  8. //将messageFuture保存到futures集合中
  9. futures.put(rpcMessage.getId(), messageFuture);
  10. //发送请求
  11. channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
  12. if (!future.isSuccess()) {
  13. //如果请求发送失败,则从futures集合中删除MessageFuture,并且销毁channel通道
  14. MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
  15. if (messageFuture1 != null) {
  16. messageFuture1.setResultMessage(future.cause());
  17. }
  18. destroyChannel(future.channel());
  19. }
  20. });
  21. try {
  22. //从messageFuture中获取远程返回的数据,如果远程没有返回,这里会一直等待
  23. return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
  24. } catch (Exception exx) {
  25. ...
  26. }
  27. }

上面messageFuture.get方法中调用下面的代码:

  1. try {
  2. result = origin.get(timeout, unit);
  3. } catch (ExecutionException e) {
  4. ...
  5. } catch (TimeoutException e) {
  6. ...
  7. }

这里可以看到,请求发送成功后,会调用CompletableFuture的get方法,当响应没有返回时,线程会一直阻塞在origin.get位置。当远程返回了响应,会在ServerOnResponseProcessor.process方法里面设置:

  1. messageFuture.setResultMessage(rpcMessage.getBody());

这样就会触发origin.get方法返回,也就触发请求线程开始运行。

ServerOnResponseProcessor处理分支事务提交结果响应和分支事务回滚结果响应时,并没有特殊的处理,仅仅是设置了响应结果,然后触发发送请求的线程由阻塞转为运行状态。

相关文章