链路追踪

文章40 |   阅读 30570 |   点赞0

来源:https://blog.csdn.net/weixin_42073629/category_9940428.html

SkyWalking 源码分析 —— Collector Storage 存储组件

x33g5p2x  于2021-12-21 转载在 其他  
字(11.4k)|赞(0)|评价(0)|浏览(705)

1. 概述

本文主要分享 SkyWalking Collector Storage 存储组件。顾名思义,负责将调用链路、应用、应用实例等等信息存储到存储器,例如,ES 、H2 。
友情提示:建议先阅读 《SkyWalking 源码分析 —— Collector 初始化》 ,以了解 Collector 组件体系。

FROM https://github.com/apache/incubating-skywalking

下面我们来看看整体的项目结构,如下图所示 :

  • apm-collector-core 的 data 和 define  :数据的抽象。
  • collector-storage-define :定义存储组件接口。
  • collector-storage-h2-provider :基于 H2 的 存储组件实现。该实现是单机版,建议仅用于 SkyWalking 快速上手,生产环境不建议使用
  • collector-storage-es-provider :基于 Elasticsearch 的集群管理实现。生产环境推荐使用

下面,我们从接口到实现的顺序进行分享。

2. apm-collector-core

apm-collector-core 的 data 和 define ,如下图所示:

我们对类进行梳理分类,如下图:

  • Table :Data 和 TableDefine 之间的桥梁,每个 Table 定义了该表的表名字段名们
  • TableDefine :Table 的详细定义,包括表名字段定义( ColumnDefine )们。在下文中,StorageInstaller 会基于 TableDefine 初始化表的相关信息。
  • Data :数据,包括一条数据的数据值们和数据字段( Column )们。在下文中,Dao 会存储 Data 到存储器中。另外,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》 中,我们也会看到对 Data 的流式处理通用封装。

2.1 Table

org.skywalking.apm.collector.core.data.CommonTable ,通用表。

  • TABLE_TYPE 静态属性,表类型。目前只有 ES 存储组件使用到,下文详细解析。
  • COLUMN_ 前缀的静态属性,通用的字段名。

在 collector-storage-define 的 table 下,我们可以看到所有 Table 类,以 "Table" 结尾。每个 Table 的表名,在每个实现类里,例如 ApplicationTable 。

2.2 TableDefine

org.skywalking.apm.collector.core.data.TableDefine ,表定义抽象类

  • name 属性,表名。
  • columnDefines 属性,ColumnDefine数组。
  • #initialize() 抽象方法,初始化表定义。例如:ApplicationEsTableDefine 。

不同的存储组件实现,有不同的 TableDefine 实现类,如下图:

  • ElasticSearchTableDefine :基于 Elasticsearch 的表定义抽象类,在 collector-storage-es-provider 的 define 下,我们可以看到所有 ES 的 TableDefine 类。
  • H2TableDefine :基于 H2 的表定义抽象类,在 collector-storage-h2-provider 的 define 下,我们可以看到所有 H2 的 TableDefine 类。

2.2.1 ColumnDefine

org.skywalking.apm.collector.core.data.ColumnDefine ,字段定义抽象类

  • name 属性,字段名。
  • type 属性,字段类型。

在 collector-storage-xxx-provider 模块中,H2ColumnDefine 、ElasticSearchColumnDefine 实现 ColumnDefine 。

2.2.2 Loader

涉及到的类如下图所示:

org.skywalking.apm.collector.core.data.StorageDefineLoader ,调用 org.skywalking.apm.collector.core.define.DefinitionLoader ,从 org.skywalking.apm.collector.core.data.StorageDefinitionFile 中,加载 TableDefine 实现类数组。

另外,在 collector-storage-es-provider 和 collector-storage-h2-provider 里都有 storage.define 文件,如下图:

  • StorageDefinitionFile 声明了读取该文件。
  • 注意,DefinitionLoader 在加载时,两个文件都会被读取,最终在 StorageInstaller#defineFilter(List<TableDefine>) 方法,进行过滤。

代码比较简单,中文注释已加,胖友自己阅读理解下。

2.3 Data

org.skywalking.apm.collector.core.data.Data ,数据抽象类

在 collector-storage-define 的 table 下,我们可以看到所有 Data 类, "Table" 结尾,例如 Application 。

2.3.1 Column

org.skywalking.apm.collector.core.data.Column ,字段。

  • name 属性,字段名。
  • operation 属性,操作( Operation )。

2.3.2 Operation

org.skywalking.apm.collector.core.data.Operation ,操作接口。用于两个值之间的操作,例如,相加等等。目前实现类有:

3. collector-storage-define

collector-cluster-define :定义存储组件接口。项目结构如下 :

3.1 StorageModule

org.skywalking.apm.collector.storage.StorageModule ,实现 Module 抽象类,集群管理 Module 。

#name() 实现方法,返回模块名为 "storage" 。

#services() 实现方法,返回 Service 类名:在 org.skywalking.apm.collector.storage.dao 下的所有类 和 IBatchDAO。

3.2 table 包

在 org.skywalking.apm.collector.storage.table 包下,定义了存储模块所有的 Table 和 Data 实现类。

3.3 StorageInstaller

org.skywalking.apm.collector.storage.StorageInstaller ,存储安装器抽象类,基于 TableDefine ,初始化存储组件的表。

  • #defineFilter(List<TableDefine>) 抽象方法,过滤 TableDefine 数组中,非自身需要的。例如说,ElasticSearchStorageInstaller 过滤后,只保留 ElasticSearchTableDefine 对象。

  • #isExists(Client, TableDefine) 抽象方法,判断表是否存在。

  • #deleteTable(Client, TableDefine) 抽象方法,删除表。

  • #createTable(Client, TableDefine) 抽象方法,创建表。

  • #install(Client) 方法,基于 TableDefine ,初始化存储组件的表。

  • 该方法会被 StorageModuleH2Provider 或 StorageModuleEsProvider 启动时调用。

3.4 dao 包

在 collector-storage-define 项目结构图,我们看到一共有个 bao 包:

  • org.skywalking.apm.collector.storage.base.dao ,系统的 DAO 接口。

  • org.skywalking.apm.collector.storage.dao ,业务的 DAO 接口。

  • 继承系统的 DAO 接口。

  • 被 collector-storage-xxx-provider 的 dao 包实现

3.4.1 系统 DAO

org.skywalking.apm.collector.storage.base.dao.DAO ,继承 Service 接口,DAO 接口

无任何方法。

3.4.1.1 AbstractDAO

org.skywalking.apm.collector.storage.base.dao.AbstractDAO ,实现 DAO 接口,DAO 抽象基类。

  • client 属性,数据操作客户端。例如,H2Client 、ElasticSearchClient 。

在 collector-storage-xxx-provider 模块中,H2DAO 、EsDAO 实现 AbstractDAO 。

3.4.1.2 IPersistenceDAO

org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO ,实现 DAO 接口,持久化 DAO 接口,定义了 Data 的增删改查操作。

  • #get(id) 接口方法,根据 ID 查询一条 Data 。

  • #deleteHistory(startTimestamp, endTimestamp) 接口方法,删除时间范围内的 Data 们。

  • #prepareBatchInsert(data) 接口方法,准备批量插入操作对象。例如:CpuMetricEsPersistenceDAO#prepareBatchInsert(CpuMetric) 方法,返回的是 org.elasticsearch.action.index.IndexRequestBuilder 对象。注意:

  • 该方法不会发起具体的 DAO 操作,仅仅是创建插入操作对象,最终的执行在 IBatchDAO#batchPersistence(List<?>)

  • 该方法创建的是批量插入操作对象们中的一个。

  • #prepareBatchUpdate(data) 接口方法,准备批量更新操作对象。类似 #prepareBatchInsert(data) 方法。

3.4.1.3 IBatchDAO

org.skywalking.apm.collector.storage.base.dao.IBatchDAO ,实现 DAO 接口,批量操作 DAO 接口

在 collector-storage-xxx-provider 模块中,BatchH2DAO 、BatchEsDAO 实现 IBatchDAO 。

3.4.2 业务 DAO

在 StorageModule#services() 方法里,我们可以看到,业务 DAO 按照用途可以拆分成四种

  • Cache :缓存应用、应用实例、服务名
  • Register :注册应用、应用实例、服务名
  • Persistence :持久化,实际可以理解成批量持久化
  • UI :SkyWaling UI 查询使用。

那么整理如下:

PackageDataCache / RegisterPersistenceUI关联文章
registerApplication
registerInstance
registerServiceName
jvmCpuMetric
jvmCMetric
jvmMemoryMetric
jvmMemoryPoolMetric
globalGlobalTrace
instanceInstPerformance
nodeNodeComponent
nodeNodeMapping
noderefNodeReference
segmentSegmentCost
segmentSegment
serviceServiceEntry
servicerefServiceReference

4. collector-storage-h2-provider

collector-storage-h2-provider ,基于 H2 的存储组件实现。项目结构如下 :

该实现是单机版,建议仅用于 SkyWalking 快速上手,生产环境不建议使用

由于生产环境主要使用 ES 的存储组件实现,所以本文暂不解析相关实现,感兴趣的胖友自己嗨起来。

5. collector-storage-es-provider

collector-storage-es-provider ,基于 ES 的存储组件实现。项目结构如下 :

实际使用时,通过 application.yml 配置如下:

|

storage:
  elasticsearch:
    cluster_name: elasticsearch
    cluster_transport_sniffer: true
    cluster_nodes: 127.0.0.1:9300
    index_shards_number: 2
    index_replicas_number: 0
    ttl: 7

|

  • 生产环境下,推荐 Elasticsearch 配置成集群。

  • cluster_name 、cluster_transport_sniffer 、cluster_nodes 、index_shards_number 、index_replicas_number 参数,Elasticsearch 相关参数。

  • ttl :保留 N 天内的数据。超过 N 天的数据,将被自动滚动删除。

  • 该功能目前版本暂未发布,需要等到 5.0 版本后。

  • 《部署集群collector》

5.1 StorageModuleEsProvider

org.skywalking.apm.collector.storage.es.StorageModuleEsProvider ,实现 ModuleProvider 抽象类,基于 ES 的存储组件服务提供者。

#name() 实现方法,返回组件服务提供者名为 "elasticsearch" 。

module() 实现方法,返回组件类为 StorageModule 。

#requiredModules() 实现方法,返回依赖组件为 "cluster" 。

#prepare(Properties) 实现方法,执行准备阶段逻辑。

#start() 实现方法,执行启动阶段逻辑。

#notifyAfterCompleted() 实现方法,执行启动完成逻辑。

  • 第 115 行 :调用 DataTTLKeeperTimer#start() 方法,启动 DataTTLKeeperTimer 。在本文 「5.4 DataTTLKeeperTimer」 详细解析。

5.2 define 包

在 collector-storage-es-provider 项目结构图,我们看到一共有个 define 包:

  • org.skywalking.apm.collector.storage.es.base.define ,系统的 TableDefine 抽象类。

  • org.skywalking.apm.collector.storage.es.define ,业务的 TableDefine 实现类。

  • 继承系统的 TableDefine 抽象类。

5.2.1 ElasticSearchTableDefine

org.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine ,实现 TableDefine 接口,基于 Elasticsearch 的表定义抽象类

  • #type() 方法,文档元数据 _type 字段,参见 《Elasticsearch学习笔记》「_type」 。
  • #refreshInterval() 抽象方法,文档索引刷新频率,参见 《Elasticsearch: 权威指南 » 基础入门 » 分片内部原理 » 近实时搜索》「refresh API」。

5.2.2 ElasticSearchColumnDefine

org.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine ,实现 ColumnDefine 抽象类,基于 ES 的字段定义。

  • Type 枚举类:枚举 ES 字段类型。

5.2.3 业务 TableDefine 实现类

在 org.apache.skywalking.apm.collector.storage.es.define 里,我们可以看到,所有基于 ES 的业务 TableDefine 实现类。例如:ApplicationEsTableDefine 。

整体 #refreshInterval() 方法返回的结果如下:

  • 1 s

  • CpuMetricEsTableDefine

  • GCMetricEsTableDefine

  • MemoryMetricEsTableDefine

  • MemoryPoolMetricEsTableDefine

  • 2 s

  • InstPerformanceEsTableDefine

  • NodeComponentEsTableDefine

  • NodeMappingEsTableDefine

  • NodeReferenceEsTableDefine

  • ServiceEntryEsTableDefine

  • ServiceReferenceEsTableDefine

  • 2 s && WriteRequest.RefreshPolicy.IMMEDIATE

  • 【WriteRequest.RefreshPolicy.IMMEDIATE】参见 ApplicationEsRegisterDAO#save(Application) 方法

  • ApplicationEsTableDefine

  • InstanceEsTableDefine

  • ServiceNameEsTableDefine

  • 5 s

  • GlobalTraceEsTableDefine

  • SegmentCostEsTableDefine

  • 10 s

  • SegmentEsTableDefine

5.2.4 ElasticSearchStorageInstaller

友情提示:ElasticSearchStorageInstaller 主要是对 Elasticsearch Java API 的使用,所以不熟悉的胖友,可以 Google 下。

org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller ,实现 StorageInstaller 抽象类, 基于 ES 存储安装器实现类。

  • #defineFilter(List<TableDefine>) 实现方法,过滤数组中,非 ElasticSearchTableDefine 的元素。

  • #createTable(Client, TableDefine) 实现方法,创建 Elasticsearch 索引。

  • 文档数据结构如下:

  • _id :数据编号,String 类型。

  • _type :"type" 。

  • _index :TableDefine 定义的表名

  • source :Data 数据。

  • 了解 Elasticsearch 的胖友可能有和笔者一样的疑惑,网络上很多文章把 _index 类比成关系数据库的 DB ,_type 类比成关系数据库的 Table ,和 SkyWalking 目前使用的方式不一致

  • SkyWalking 彭勇升 :_index和 _type 是 ES 特有的,考虑其他数据库接入,所以没有用他这个特性。

  • SkyWalking QQ交流群( 392443393 ) ,小心 群友 :_type 本来就没做物理隔离,Lucene 层面也不存在,ES 6.x 已经废弃了。

  • 《Elasticsearch 6.0 将移除 Type》

  • #deleteTable(Client, TableDefine) 实现方法,删除 Elasticsearch 索引。

  • #isExists(Client, TableDefine) 实现方法,判断 Elasticsearch 索引是否存在。

  • 在方法里,笔者添加了一些 API 的说明,不熟悉的胖友,可以仔细阅读理解。

5.3 dao 包

在 collector-storage-es-provider 项目结构图,我们看到一共有个 dao 包:

  • org.skywalking.apm.collector.storage.es.base.dao ,系统的 DAO 抽象类。

  • org.skywalking.apm.collector.storage.es.dao ,业务的 DAO 实现类。

  • 继承系统的 DAO 抽象类。

5.3.1 EsDAO

org.skywalking.apm.collector.storage.es.base.dao.EsDAO ,实现 AbstractDAO 抽象类,基于 ES 的 DAO 抽象类

  • #getMaxId(indexName, columnName) 方法,获得索引名的指定字段的最大值
  • #getMinId(indexName, columnName) 方法,获得索引名的指定字段的最小值

5.3.2 BatchEsDAO

org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO ,实现 IBatchDAO 接口,继承 EsDAO 抽象类,基于 ES 批量操作 DAO 实现类。

  • #batchPersistence(List<?>) 实现方法,将 org.elasticsearch.action.index.IndexRequestBuilder 和 org.elasticsearch.action.index.UpdateRequestBuilder 数组,创建成 org.elasticsearch.action.bulk.BulkRequestBuilder 对象,批量持久化。

  • IndexRequestBuilder 和 UpdateRequestBuilder 的创建,在 「5.3.3 业务 DAO 实现类」 会看到。

5.3.3 业务 DAO 实现类

在 org.apache.skywalking.apm.collector.storage.es.dao 里,我们可以看到,所有基于 ES 的业务 DAO 实现类。

实现代码易懂,胖友可以自己阅读。良心如我们,按照 DAO 的业务用途,推荐例子如下:

5.4 DataTTLKeeperTimer

org.skywalking.apm.collector.storage.es.DataTTLKeeperTimer ,过期数据删除定时器。通过该定时器,只保留 N 天内的数据。

  • #start() 方法,启动定时任务。

  • 第 49 行:创建延迟 1 小时,每 8 小时执行一次 #delete() 方法的定时任务。目前该行代码被注释,胖友可以等待 SkyWallking 5.0 版本的发布。

  • #delete() 方法,删除过期数据。

  • 第 54 至 66 行:计算删除的开始与结束时间,即指定时间的前一天。例如,2017-12-23 执行时,删除 2017-12-16 那天的数据。

  • 第 69 行:调用 #deleteJVMRelatedData(startTimestamp, endTimestamp) 方法,删除 JVM 相关的数据。

  • 第 70 行:调用 #deleteTraceRelatedData(startTimestamp, endTimestamp) 方法,删除 Trace 相关的数据。

如下是不会删除的数据的表:

  • Application
  • Instance
  • ServiceName
  • ServiceEntry

相关文章