Seata解析-seata核心类NettyRemotingServer详解

x33g5p2x  于2021-12-21 转载在 其他  
字(13.0k)|赞(0)|评价(0)|浏览(732)

本文基于seata 1.3.0版本

NettyRemotingServer是TM服务端的核心类之一,它用于启动netty,监听服务器端口,接收TM、RM的请求,它还为处理不同的请求创建不同的处理器。本文将详解NettyRemotingServer类。

一、NettyRemotingServer继承体系

下图是NettyRemotingServer的继承图,里面展示的方法都是protected及以上的:

从上图可以看出,NettyRemotingServer实现了Disposable和RemotingServer接口,继承了AbstractNettyRemotingServer和AbstractNettyRemoting类。
Disposable接口只有一个destroy方法,该方法是在JVM停止的时候,回调钩子调用。
RemotingServer接口定义了发送请求和响应信息的方法以及一个注册处理器的方法,从名字上可以看出这个接口是用于服务端的。
AbstractNettyRemoting抽象类定义了同步和异步发送消息方法(sendSync和sendAsync)以及将收到的消息转交给对应处理器处理的方法(processMessage),该类定义发送消息和接受消息的基础方法,该类是服务端和客户端实现与Netty相关的远程通讯的基类。
AbstractNettyRemotingServer实现了RemotingServer接口,继承了AbstractNettyRemoting抽象类,该类完成下面几个功能:

  • 当发送消息时,该类会调用AbstractNettyRemoting中对应的方法进行发送;
  • 实现了registerProcessor处理器注册方法,可以注册处理器,处理器注册到该类的属性processorTable中;
  • 定义了内部类ServerHandler,它实现自Netty的ChannelDuplexHandler,ServerHandler会被注册到Netty的责任链中,因此当收到消息时,所有的消息都会进入到ServerHandler的channelRead方法中处理;
  • 该类还有一个属性NettyServerBootstrap,NettyServerBootstrap的作用是用于启动和停止服务端Netty,该类的init方法调用NettyServerBootstrap的start方法启动服务端netty。

NettyRemotingServer的init方法有两个作用:一是调用registerProcessor注册处理器,二是调用父类的init方法启动服务端netty。
服务端主要是接收消息并对其进行处理,因此需要重点关注启动流程和接收消息流程,下图展示了启动和接收消息场景中关键方法的调用关系:

二、接收消息处理器

服务端接收的消息最终都是由处理器处理的,那么seata服务端一共注册了哪些处理器?
服务端的处理器都是通过NettyRemotingServer的registerProcessor方法注册的,该方法在服务端启动的时候由NettyRemotingServer的init方法调用:

  1. private void registerProcessor() {
  2. // 1. registry on request message processor
  3. ServerOnRequestProcessor onRequestProcessor =
  4. new ServerOnRequestProcessor(this, getHandler());
  5. super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
  6. super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
  7. super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
  8. super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
  9. super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
  10. super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
  11. super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
  12. super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
  13. super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
  14. // 2. registry on response message processor
  15. ServerOnResponseProcessor onResponseProcessor =
  16. new ServerOnResponseProcessor(getHandler(), getFutures());
  17. super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
  18. super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
  19. // 3. registry rm message processor
  20. RegRmProcessor regRmProcessor = new RegRmProcessor(this);
  21. super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
  22. // 4. registry tm message processor
  23. RegTmProcessor regTmProcessor = new RegTmProcessor(this);
  24. super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
  25. // 5. registry heartbeat message processor
  26. ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
  27. super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
  28. }

上面的代码注册处理器时,都调用了父类的registerProcessor方法:

  1. //第一个参数是消息类型,
  2. //第二个参数是处理器对象,
  3. //第三个参数messageExecutor是线程池,
  4. //如果messageExecutor不为null,那么在处理消息时会在线程池中增加一个处理器处理消息的异步任务,对消息做异步处理,如果messageExecutor为null,那么消息会同步处理
  5. public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
  6. Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
  7. //processorTable的类型是HashMap<Integer, Pair<RemotingProcessor, ExecutorService>>,
  8. //当收到消息的时候,会根据消息类型从processorTable找到对应的处理器和线程池
  9. this.processorTable.put(messageType, pair);
  10. }

处理器一共有五个,每个处理器处理的消息类型如下表:

这五个处理器都实现了接口RemotingProcessor,该接口也非常简单,只有一个process方法:

  1. public interface RemotingProcessor {
  2. void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception;
  3. }

seata的客户端发送消息的时候,会将消息类型添加到消息中,这样接收端在接收到消息后,便可以找到对应的类,解析为对应的消息对象,最后消息对象被转发给AbstractNettyRemoting的processMessage,processMessage里面根据消息对象中的消息类型从processorTable中找到对应的处理器,之后处理器在将消息对象做进一步处理,processMessage方法的代码如下:

  1. //下面将部分代码做了删减
  2. protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  3. Object body = rpcMessage.getBody();
  4. //seata消息都是MessageTypeAware的
  5. if (body instanceof MessageTypeAware) {
  6. MessageTypeAware messageTypeAware = (MessageTypeAware) body;
  7. //根据消息类型从processorTable中找到对应的处理器和线程池
  8. final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
  9. if (pair != null) {
  10. //根据线程池是否为null,处理消息分为同步和异步
  11. if (pair.getSecond() != null) {
  12. try {
  13. pair.getSecond().execute(() -> {
  14. try {
  15. //异步调用处理器的process方法
  16. pair.getFirst().process(ctx, rpcMessage);
  17. } catch (Throwable th) {
  18. LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  19. }
  20. });
  21. } catch (RejectedExecutionException e) {
  22. ...//代码删减
  23. } else {
  24. try {
  25. //同步调用处理器的process方法
  26. pair.getFirst().process(ctx, rpcMessage);
  27. } catch (Throwable th) {
  28. LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  29. }
  30. }
  31. } else {
  32. LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
  33. }
  34. } else {
  35. LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
  36. }
  37. }

下面具体看一下每个处理器的实现。

1、ServerOnRequestProcessor

本处理接收到消息后,进行一些简单的校验,校验通过的消息会转发给协调器DefaultCoordinator处理做进一步处理。

  1. public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  2. //检查当前的channel是否注册过,
  3. //在发起事务请求前,TM和RM需要发送注册信息在服务器端注册,服务器端注册是通过RegRmProcessor和RegTmProcessor处理的
  4. //seata中的连接都是长连接,TM、RM和服务端建立连接后,会一直使用相同的channel通讯
  5. if (ChannelManager.isRegistered(ctx.channel())) {
  6. onRequestMessage(ctx, rpcMessage);
  7. } else {
  8. //如果当前channel没有注册过,seata任务是非法连接,直接关闭该连接。
  9. try {
  10. if (LOGGER.isInfoEnabled()) {
  11. LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
  12. }
  13. ctx.disconnect();
  14. ctx.close();
  15. } catch (Exception exx) {
  16. LOGGER.error(exx.getMessage());
  17. }
  18. if (LOGGER.isInfoEnabled()) {
  19. LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
  20. }
  21. }
  22. }
  23. private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
  24. Object message = rpcMessage.getBody();
  25. //获取RpcContext上下文对象
  26. //RpcContext上下文对象是在TM和RM注册的时候创建的
  27. RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
  28. //下面的if/else分支用于打印日志,从这可以看出,seata的日志打印功能是比较完备的
  29. if (LOGGER.isDebugEnabled()) {
  30. LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message,
  31. NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
  32. } else {
  33. try {
  34. BatchLogHandler.INSTANCE.getLogQueue()
  35. .put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:"
  36. + rpcContext.getTransactionServiceGroup());
  37. } catch (InterruptedException e) {
  38. LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
  39. }
  40. }
  41. //消息类型不是AbstractMessage直接丢弃
  42. if (!(message instanceof AbstractMessage)) {
  43. return;
  44. }
  45. //MergedWarpMessage表示当前的消息是一个复合消息,里面包含了多个消息,需要对每个消息进行遍历
  46. if (message instanceof MergedWarpMessage) {
  47. AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage) message).msgs.size()];
  48. for (int i = 0; i < results.length; i++) {
  49. final AbstractMessage subMessage = ((MergedWarpMessage) message).msgs.get(i);
  50. //调用协调器DefaultCoordinator处理
  51. results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);
  52. }
  53. MergeResultMessage resultMessage = new MergeResultMessage();
  54. resultMessage.setMsgs(results);
  55. //异步发送响应消息
  56. remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
  57. } else {
  58. // the single send request message
  59. final AbstractMessage msg = (AbstractMessage) message;
  60. //调用协调器DefaultCoordinator处理
  61. AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
  62. //异步发送响应消息
  63. remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
  64. }
  65. }

2、ServerOnResponseProcessor

本处理器的对响应消息的处理分为两部分:一是需要对响应消息做特殊处理的,则通过触发回调逻辑完成对响应消息的处理;二是使用协调器DefaultCoordinator完成处理。

  1. public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  2. //发送的消息如果需要对响应消息做特殊处理的都会记录到futures中,
  3. //这样当响应消息到达时,可以通过MessageFuture对象触发回调逻辑对响应消息处理
  4. MessageFuture messageFuture = futures.remove(rpcMessage.getId());
  5. if (messageFuture != null) {
  6. messageFuture.setResultMessage(rpcMessage.getBody());
  7. } else {
  8. if (ChannelManager.isRegistered(ctx.channel())) {
  9. onResponseMessage(ctx, rpcMessage);
  10. } else {
  11. //如果当前channel没有注册过,seata任务是非法连接,直接关闭该连接。
  12. try {
  13. if (LOGGER.isInfoEnabled()) {
  14. LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
  15. }
  16. ctx.disconnect();
  17. ctx.close();
  18. } catch (Exception exx) {
  19. LOGGER.error(exx.getMessage());
  20. }
  21. if (LOGGER.isInfoEnabled()) {
  22. LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
  23. }
  24. }
  25. }
  26. }
  27. private void onResponseMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
  28. //打印日志使用
  29. if (LOGGER.isDebugEnabled()) {
  30. LOGGER.debug("server received:{},clientIp:{},vgroup:{}", rpcMessage.getBody(),
  31. NetUtil.toIpAddress(ctx.channel().remoteAddress()),
  32. ChannelManager.getContextFromIdentified(ctx.channel()).getTransactionServiceGroup());
  33. } else {
  34. try {
  35. BatchLogHandler.INSTANCE.getLogQueue()
  36. .put(rpcMessage.getBody() + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:"
  37. + ChannelManager.getContextFromIdentified(ctx.channel()).getTransactionServiceGroup());
  38. } catch (InterruptedException e) {
  39. LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
  40. }
  41. }
  42. if (rpcMessage.getBody() instanceof AbstractResultMessage) {
  43. //获取RpcContext上下文对象
  44. RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
  45. //调用协调器DefaultCoordinator处理
  46. transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), rpcContext);
  47. }
  48. }

3、RegRmProcessor

本处理器主要是调用ChannelManager.registerRMChannel方法将RM信息注册到服务端。registerRMChannel方法的具体实现逻辑后续文章在做介绍。

  1. public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  2. onRegRmMessage(ctx, rpcMessage);
  3. }
  4. private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
  5. //得到RM注册请求消息
  6. RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();
  7. //获得请求方的ip和端口,用于打印日志使用
  8. String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
  9. boolean isSuccess = false;
  10. String errorInfo = StringUtils.EMPTY;
  11. try {
  12. //在本版本里面权限检测处理器checkAuthHandler的功能还不完善,权限检测直接返回true
  13. if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {
  14. //RM信息注册到服务器端
  15. ChannelManager.registerRMChannel(message, ctx.channel());
  16. //存储客户端的版本号,从目前的版本代码看,只是做了存储,可能在后续版本会使用
  17. Version.putChannelVersion(ctx.channel(), message.getVersion());
  18. isSuccess = true;
  19. if (LOGGER.isDebugEnabled()) {
  20. LOGGER.debug("checkAuth for client:{},vgroup:{},applicationId:{} is OK", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
  21. }
  22. }
  23. } catch (Exception exx) {
  24. isSuccess = false;
  25. errorInfo = exx.getMessage();
  26. LOGGER.error("RM register fail, error message:{}", errorInfo);
  27. }
  28. //构建响应消息
  29. RegisterRMResponse response = new RegisterRMResponse(isSuccess);
  30. if (StringUtils.isNotEmpty(errorInfo)) {
  31. response.setMsg(errorInfo);
  32. }
  33. //异步发送响应消息
  34. remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
  35. if (LOGGER.isInfoEnabled()) {
  36. LOGGER.info("RM register success,message:{},channel:{},client version:{}", message, ctx.channel(),
  37. message.getVersion());
  38. }
  39. }

4、RegTmProcessor

本处理器主要是调用ChannelManager.registerTMChannel方法将TM信息注册到服务端。registerTMChannel方法的具体实现逻辑后续文章在做介绍。

  1. public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  2. onRegTmMessage(ctx, rpcMessage);
  3. }
  4. private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
  5. //得到TM注册请求消息
  6. RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody();
  7. //获得请求方的ip和端口,用于打印日志使用
  8. String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
  9. //存储客户端的版本号,从目前的版本代码看,只是做了存储,可能在后续版本会使用
  10. Version.putChannelVersion(ctx.channel(), message.getVersion());
  11. boolean isSuccess = false;
  12. String errorInfo = StringUtils.EMPTY;
  13. try {
  14. //在本版本里面权限检测处理器checkAuthHandler的功能还不完善,权限检测直接返回true
  15. if (null == checkAuthHandler || checkAuthHandler.regTransactionManagerCheckAuth(message)) {
  16. //TM信息注册到服务器端
  17. ChannelManager.registerTMChannel(message, ctx.channel());
  18. Version.putChannelVersion(ctx.channel(), message.getVersion());
  19. isSuccess = true;
  20. if (LOGGER.isDebugEnabled()) {
  21. LOGGER.debug("checkAuth for client:{},vgroup:{},applicationId:{}",
  22. ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
  23. }
  24. }
  25. } catch (Exception exx) {
  26. isSuccess = false;
  27. errorInfo = exx.getMessage();
  28. LOGGER.error("TM register fail, error message:{}", errorInfo);
  29. }
  30. //构建响应消息
  31. RegisterTMResponse response = new RegisterTMResponse(isSuccess);
  32. if (StringUtils.isNotEmpty(errorInfo)) {
  33. response.setMsg(errorInfo);
  34. }
  35. //异步发送响应消息
  36. remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
  37. if (LOGGER.isInfoEnabled()) {
  38. LOGGER.info("TM register success,message:{},channel:{},client version:{}", message, ctx.channel(),
  39. message.getVersion());
  40. }
  41. }

5、ServerHeartbeatProcessor

心跳处理器的处理逻辑还是非常简单的,收到心跳消息后,直接同步返回响应。

  1. public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  2. try {
  3. remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), HeartbeatMessage.PONG);
  4. } catch (Throwable throwable) {
  5. LOGGER.error("send response error: {}", throwable.getMessage(), throwable);
  6. }
  7. if (LOGGER.isDebugEnabled()) {
  8. LOGGER.debug("received PING from {}", ctx.channel().remoteAddress());
  9. }
  10. }

后面的文章会对协调器和ChannelManager做进一步的分析。

相关文章