链路追踪

文章40 |   阅读 30621 |   点赞0

来源:https://blog.csdn.net/weixin_42073629/category_9940428.html

SkyWalking 源码分析 —— Agent Remote 远程通信服务

x33g5p2x  于2021-12-21 转载在 其他  
字(2.0k)|赞(0)|评价(0)|浏览(545)

1. 概述

本文主要分享 SkyWalking Agent Remote 远程通信服务。该服务用于 Agent 和Collector 集群的通信。

在 《SkyWalking 源码分析 —— Collector Naming Server 命名服务》 一文中,我们已经看到,Agent 使用定时轮询,从 Collector Naming Server 中,获得 Collector 集群的 Collector Agent gRPC Server 的所有地址

2. GRPCChannelManager

org.skywalking.apm.agent.core.remote.GRPCChannelManager ,实现 BootService 、Runnable 接口,gRPC Channel 管理器。GRPCChannelManager 负责管理与 Collector Agent gRPC Server 集群的连接的管理提供给其他服务使用

  • managedChannel 属性,连接 gRPC Server 的 Channel 。同一时间,GRPCChannelManager 只连接一个 Collector Agent gRPC Server 节点,并且在 Channel 不因为各种网络问题断开的情况下,持续保持
  • connectCheckFuture 属性,定时重连 gRPC Server 的定时任务
  • reconnect 属性,是否重连。当 Channel 未连接需要连接,或者 Channel 断开需要重连时,标记 reconnect = true 。后台线程会根据该标识进行连接( 重连 )。
  • listeners 属性,监听器( org.skywalking.apm.agent.core.remote.GRPCChannelListener ) 数组。使用 Channel 的其他服务,注册监听器到 GRPCChannelManager 上,从而根据连接状态( org.skywalking.apm.agent.core.remote.GRPCChannelStatus ),实现自定义逻辑。

#boot() 实现方法,调用 ScheduledExecutorService#scheduleAtFixedRate(...) 方法,创建定时任务。该定时任务无初始化延迟,每 Config.GRPC_CHANNEL_CHECK_INTERVAL ( 默认:30 s ) 执行一次 #run() 方法。

#run() 实现方法,执行 Channel 的连接( 重连 )逻辑。代码如下:

  • 第 99 行:当 reconnect = true 时,才执行连接( 重连 )。
  • 第 100 行:当本地已经获取到 Collector Agent gRPC Server 集群地址时,参见 《SkyWalking 源码分析 —— Collector Naming Server 命名服务》 。
  • 第 101 至 106 行:随机选择准备链接的 Collector Agent gRPC Server 。
  • 第 107 至 113 行:创建 Channel 并进行连接。此处主要是 gRPC 的 API 使用,不熟悉的胖友,请 Google 下进行了解了解。
  • 第 115 至 117 行:连接成功,标记 reconnect = false ,这样,下次执行 #run() 方法不会重连。而后,调用 #notify(GRPCChannelStatus.CONNECTED) 方法,通知监听器连接成功。
  • 第 118 至 121 行:连接成功,不标记 reconnect ,这样,下次执行 #run() 方法会继续重连。而后,调用 #notify(GRPCChannelStatus.DISCONNECT) 方法,通知监听器连接处于断开状态。
  • 第 124 至 126 行:连接异常,不标记 reconnect ,这样,下次执行 #run() 方法会继续重连。而后,调用 #notify(GRPCChannelStatus.DISCONNECT) 方法,通知监听器连接处于断开状态。

实际使用中,Channel 可能因为各种原因断开,那么 GRPCChannelManager 是怎么检测的呢?在使用 Channel 的其他服务,当使用 Channel 时发生异常,调用 #reportError(Throwable) 方法,判断是否为网络异常( #isNetworkError(Throwable) ) 。若是,标记 reconnect = true ,等待后台进行重连。

3. GRPCChannelListener

org.skywalking.apm.agent.core.remote.GRPCChannelListener ,gRPC Channel 的监听器接口,定义了 #statusChanged(GRPCChannelStatus) ,通知 gRPC Channel 状态变更。

GRPCChannelListener 实现类如下图,后续文章会详细解析。

相关文章