Storm是一个分布式的实时计算框架,可以很方便地对流式数据进行实时处理和分析,能运用在实时分析、在线数据挖掘、持续计算以及分布式 RPC 等场景下。Storm 的实时性可以使得数据从收集到处理展示在秒级别内完成,从而为业务方决策提供实时的数据支持。
在美团点评公司内部,实时计算主要应用场景包括实时日志解析、用户行为分析、实时消息推送、消费趋势展示、实时新客判断、实时活跃用户数统计等。这些数据提供给各事业群,并作为他们实时决策的有力依据,弥补了离线计算“T+1”的不足。
在实时计算中,用户不仅仅关心时效性的问题,同时也关心消息处理的成功率。本文将通过实验验证 Storm 的消息可靠性保证机制,文章分为消息保证机制、测试目的、测试环境、测试场景以及总结等五节。
Storm 提供了三种不同层次的消息保证机制,分别是 At Most Once、At Least Once 以及 Exactly Once。消息保证机制依赖于消息是否被完全处理。
每个从 Spout(Storm 中数据源节点)发出的 Tuple(Storm 中的最小消息单元)可能会生成成千上万个新的 Tuple,形成一棵 Tuple 树,当整棵 Tuple 树的节点都被成功处理了,我们就说从 Spout 发出的 Tuple 被完全处理了。 我们可以通过下面的例子来更好地诠释消息被完全处理这个概念:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KafkaSpout(spoutConfig), spoutNum);
builder.setBolt("split", new SplitSentence(), 10)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
.fieldsGrouping("split", new Fields("word"));
这个 Topology 从 Kafka(一个开源的分布式消息队列)读取信息发往下游,下游的 Bolt 将收到的句子分割成单独的单词,并进行计数。每一个从 Spout 发送出来的 Tuple 会衍生出多个新的 Tuple,从 Spout 发送出来的 Tuple 以及后续衍生出来的 Tuple 形成一棵 Tuple 树,下图是一棵 Tuple 树示例:
上图中所有的 Tuple 都被成功处理了,我们才认为 Spout 发出的 Tuple 被完全处理。如果在一个固定的时间内(这个时间可以配置,默认为 30 秒),有至少一个 Tuple 处理失败或超时,则认为整棵 Tuple 树处理失败,即从 Spout 发出的 Tuple 处理失败。
<img src="img/storm_tuple_guarantee.jpg" style="zoom:67%;" />
Tuple 的完全处理需要 Spout、Bolt 以及 Acker(Storm 中用来记录某棵 Tuple 树是否被完全处理的节点)协同完成,如上图所示。从 Spout 发送 Tuple 到下游,并把相应信息通知给 Acker,整棵 Tuple 树中某个 Tuple 被成功处理了都会通知 Acker,待整棵 Tuple 树都被处理完成之后,Acker 将成功处理信息返回给 Spout;如果某个 Tuple 处理失败,或者超时,Acker 将会给 Spout 发送一个处理失败的消息,Spout 根据 Acker 的返回信息以及用户对消息保证机制的选择判断是否需要进行消息重传。
Storm 提供的三种不同消息保证机制中。利用 Spout、Bolt 以及 Acker 的组合我们可以实现 At Most Once 以及 At Least Once 语义,Storm 在 At Least Once 的基础上进行了一次封装(Trident),从而实现 Exactly Once 语义。
Storm 的消息保证机制中,如果需要实现 At Most Once 语义,只需要满足下面任何一条即可:
如果需要实现 At Least Once 语义,则需要同时保证如下几条:
实现 Exactly Once 语义,则需要在 At Least Once 的基础上进行状态的存储,用来防止重复发送的数据被重复处理,在 Storm 中使用 Trident API 实现。
下图中,每种消息保证机制中左边的字母表示上游发送的消息,右边的字母表示下游接收到的消息。从图中可以知道,At Most Once 中,消息可能会丢失(上游发送了两个 A,下游只收到一个 A);At Least Once 中,消息不会丢失,可能重复(上游只发送了一个B ,下游收到两个B);Exactly Once 中,消息不丢失、不重复,因此需要在 At Least Once 的基础上保存相应的状态,表示上游的哪些消息已经成功发送到下游,防止同一条消息发送多次给下游的情况。
Storm 官方提供 At Most Once、At Least Once 以及 Exactly Once 三种不同层次的消息保证机制,我们希望通过相关测试,达到如下目的:
本文的测试环境如下: 每个 worker(worker 为一个 物理 JVM 进程,用于运行实际的 Storm 作业)分配 1 CPU 以及 1.6G 内存。Spout、Bolt、Acker分别跑在单独的 worker 上。并通过在程序中控制抛出异常以及人工 Kill Spout/Bolt/Acker 的方式来模拟实际情况中的异常情况。
三种消息保证机制的测试均由 Spout 从 Kafka 读取测试数据,经由相应 Bolt 进行处理,然后发送到 Kafka,并将 Kafka 上的数据同步到 MySQL 方便最终结果的统计。
测试数据为 Kafka 上顺序保存的一系列纯数字,数据量分别有十万、五十万、一百万等,每个数字在每个测试样例中出现且仅出现一次。
对于三种不同的消息保证机制,我们分别设置了不同的测试场景,来进行充分的测试。其中为了保证 Spout/Bolt/Acker 发生异常的情况下不影响其他节点,在下面的测试中,所有的节点单独运行在独立的 Worker 上。
从背景中可以得知,如果希望实现 At Most Once 语义,将 Acker 的数目设置为 0 即可,本文的测试过程中通过把设置 Acker 为 0 来进行 At Most Once 的测试。
保存在 Kafka 上的一系列纯数字,数据量从十万到五百万不等,每个测试样例中,同一个数字在 Kafka 中出现且仅出现一次。
异常次数 | 测试数据总量 | 结果集中不同 Tuple 的总量 | 丢失的 Tuple 数据量 | Tuple 的丢失百分比 | Tuple 的重复量 |
---|---|---|---|---|---|
0 | 500000 | 500000 | 0 | 0% | 0 |
0 | 1000000 | 1000000 | 0 | 0% | 0 |
0 | 2000000 | 2000000 | 0 | 0% | 0 |
0 | 3000000 | 3000000 | 0 | 0% | 0 |
异常次数 | 测试数据总量 | 结果集中不同 Tuple 的总量 | 丢失的 Tuple 数据量 | Tuple 的丢失百分比 | Tuple 的重复量 |
---|---|---|---|---|---|
1 | 3000000 | 2774940 | 225060 | 7.50% | 0 |
2 | 3000000 | 2307087 | 692913 | 23.09% | 0 |
3 | 3000000 | 2082823 | 917177 | 30.57% | 0 |
4 | 3000000 | 1420725 | 1579275 | 52.64% | 0 |
不发生异常的情况下,消息能够不丢不重;Bolt 发生异常的情况下,消息会丢失,不会重复,其中消息的丢失数目与异常次数正相关。与官方文档描述相符,符合预期。
为了实现 At Least Once 语义,需要 Spout、Bolt、Acker 进行配合。我们使用 Kafka-Spout 并通过自己管理 offset 的方式来实现可靠的 Spout;Bolt 通过继承 BaseBasicBolt,自动帮我们建立 Tuple 树以及消息处理之后通知 Acker;将 Acker 的数目设置为 1,即打开 ACK 机制,这样整个 Topology 即可提供 At Least Once 的语义。
Kafka 上保存的十万到五十万不等的纯数字,其中每个测试样例中,每个数字在 Kafka 中出现且仅出现一次。
Acker 发生异常的情况
异常的次数 | 测试数据总量 | 结果集中不重复的 Tuple 数 | 数据重复的次数(>1) | 出现重复的 Tuple 数 | 数据丢失数量 | 最大积压量 |
---|---|---|---|---|---|---|
0 | 100000 | 100000 | - | - | 0 | 2000(默认值) |
0 | 200000 | 200000 | - | - | 0 | 2000 |
0 | 300000 | 300000 | - | - | 0 | 2000 |
0 | 400000 | 400000 | - | - | 0 | 2000 |
异常的次数 | 测试数据总量 | 结果集中不重复的 Tuple 数 | 数据重复的次数(>1) | 出现重复的 Tuple 数 | 数据丢失数量 | 最大积压量 |
---|---|---|---|---|---|---|
1 | 100000 | 100000 | 2 | 2000 | 0 | 2000 |
2 | 100000 | 100000 | 2 | 4001 | 0 | 2000 |
3 | 100000 | 100000 | 2 | 6000 | 0 | 2000 |
4 | 100000 | 100000 | 2 | 8000 | 0 | 2000 |
Spout 发生异常的情况
异常的次数 | 测试数据总量 | 结果集中不重复的 Tuple 数 | 数据重复的次数(>1) | 出现重复的 Tuple 数 | 数据丢失数量 |
---|---|---|---|---|---|
0 | 100000 | 100000 | - | - | 0 |
0 | 200000 | 200000 | - | - | 0 |
0 | 300000 | 300000 | - | - | 0 |
0 | 400000 | 400000 | - | - | 0 |
异常的次数 | 测试数据总量 | 结果集中不重复的 Tuple 数 | 数据重复的次数(>1) | 出现重复的 Tuple 数 | 数据丢失数量 |
---|---|---|---|---|---|
1 | 100000 | 100000 | 2 | 2052 | 0 |
2 | 100000 | 100000 | 2 | 4414 | 0 |
4 | 100000 | 100000 | 2 | 9008 | 0 |
6 | 100000 | 100000 | 2 | 9690 | 0 |
Bolt 发生异常的情况
调用 emit 函数之前发生异常
异常的次数 | 测试数据总量 | 结果集中不重复的 Tuple 数 | 数据重复的次数(>1) | 出现重复的 Tuple 数 | 数据丢失数量 |
---|---|---|---|---|---|
0 | 100000 | 100000 | - | - | 0 |
0 | 200000 | 200000 | - | - | 0 |
0 | 300000 | 300000 | - | - | 0 |
0 | 400000 | 400000 | - | - | 0 |
异常的次数 | 测试数据总量 | 结果集中不重复的 Tuple 数 | 数据重复的次数(>1) | 出现重复的 Tuple 数 | 数据丢失数量 |
---|---|---|---|---|---|
1 | 100000 | 100000 | - | - | 0 |
2 | 200000 | 200000 | - | - | 0 |
4 | 300000 | 300000 | - | - | 0 |
8 | 400000 | 400000 | - | - | 0 |
10 | 400000 | 400000 | - | - | 0 |
调用 emit 函数之后发生异常
异常的次数 | 测试数据总量 | 结果集中不重复的 Tuple 数 | 数据重复的次数(>1) | 出现重复的 Tuple 数 | 数据丢失数量 |
---|---|---|---|---|---|
0 | 100000 | 100000 | - | - | 0 |
0 | 200000 | 200000 | - | - | 0 |
0 | 300000 | 300000 | - | - | 0 |
0 | 400000 | 400000 | - | - | 0 |
异常的次数 | 测试数据总量 | 结果集中不重复的 Tuple 数 | 数据重复的次数(>1) | 出现重复的 Tuple 数 | 数据丢失数量 |
---|---|---|---|---|---|
1 | 100000 | 100000 | 2 | 2 | 0 |
2 | 200000 | 200000 | 2 | 3 | 0 |
4 | 300000 | 300000 | 2 | 5 | 0 |
8 | 400000 | 400000 | 2 | 9 | 0 |
10 | 400000 | 400000 | 2 | 11 | 0 |
从上面的表格中可以得到,消息不会丢失,可能发生重复,重复的数目与异常的情况相关。
结论与官方文档所述相符,每条消息至少发送一次,保证数据不会丢失,但可能重复,符合预期。
对于 Exactly Once 的语义,利用 Storm 中的 Trident 来实现。
Kafka 上保存的一万到一百万不等的数字,每个数字在每次测试样例中出现且仅出现一次。
Spout 发生异常情况
异常数 | 测试数据量 | 结果集中不重复的 Tuple 数 | 结果集中所有 Tuple 的总和 |
---|---|---|---|
1 | 10000 | 10000 | 50005000 |
2 | 10000 | 10000 | 50005000 |
3 | 10000 | 10000 | 50005000 |
Acker 发生异常的情况
异常数 | 测试数据量 | 结果集中不重复的 Tuple 数 | 结果集中所有 Tuple 的总和 |
---|---|---|---|
1 | 10000 | 10000 | 50005000 |
2 | 10000 | 10000 | 50005000 |
3 | 10000 | 10000 | 50005000 |
Bolt发生异常情况
异常数 | 测试数据量 | 结果集中不重复的 Tuple 数 | 结果集中所有 Tuple 的总和 |
---|---|---|---|
1 | 10000 | 10000 | 50005000 |
2 | 10000 | 10000 | 50005000 |
3 | 10000 | 10000 | 50005000 |
在所有情况下,最终结果集中的消息不会丢失,不会重复,与官方文档中的描述相符,符合预期。
对 Storm 提供的三种不同消息保证机制,用户可以根据自己的需求选择不同的消息保证机制。
对于 Storm 提供的三种消息可靠性保证,优缺点以及使用场景如下所示:
可靠性保证层次 | 优点 | 缺点 | 使用场景 |
---|---|---|---|
At most once | 处理速度快 | 数据可能丢失 | 都处理速度要求高,且对数据丢失容忍度高的场景 |
At least once | 数据不会丢失 | 数据可能重复 | 不能容忍数据丢失,可以容忍数据重复的场景 |
Exactly once | 数据不会丢失,不会重复 | 处理速度慢 | 对数据不丢不重性质要求非常高,且处理速度要求没那么高,比如支付金额 |
对于 At Least Once 的保证需要做如下几步:
不满足以上三条中任意一条的都只提供 At Most Once 的消息可靠性保证,如果希望得到 Exactly Once 的消息可靠性保证,可以使用 Trident 进行实现。
内容来源于网络,如有侵权,请联系作者删除!