流式数据在实际应用中非常常见,典型的流式数据包括点击日志、监控指标数据、搜索日志等。流式数据往往伴随实时计算需求,即对流式数据进行实时分析,以便尽可能快速地获取有价值的信息。在大数据领域,将针对流式数据进行实时分析的计算引擎称为流式实时计算引擎。这类引擎最大的特点是延迟低,即从数据产生到最终处理完成,整个过程用时极短,往往是毫秒级或秒级处理延迟。与批处理计算引擎类似,流式实时计算引擎也需具有良好的容错性、扩展性和编程简易性等特点。目前常用的流式实时计算引擎分为两类:面向行(row-based)和面向微批处理(micro-batch)。其中,面向行的流式实时计算引擎的代表是Apache Storm,其典型特点是延迟低,但吞吐率也低;而面向微批处理的流式实时计算引擎的代表是Spark Streaming,其典型特点是延迟高,但吞吐率也高。
一个流式计算过程可用图13-1所示概括,一条消息(msg1)到达后,依次经若干用户实现逻辑处理后,将最终结果写入外部系统。每条消息经用户逻辑处理后,会衍生出新的消息(比如msg1衍生出msg2或msg3)。而对于流式计算而言,应保证消息的可靠性:每条消息进入系统后,可以依次完整经历用户定义的逻辑,最终产生期望的结果,而不应因任意故障导致消息处理中断后致使消息处理不完整。
图13-1 流式计算过程
传统的流式计算平台是通过“消息队列+工作进程”组合方式构建的,具体如图13-2所示。流式数据到达系统后:
图13-2 传统流式计算平台
这类系统能够解决流式数据处理的问题,但是存在以下几个缺点:
为了克服传统消息队列系统的不足,新型流式计算引擎诞生了。这类计算引擎为用户提供了简易的编程接口,用户可通过实现这些编程接口即可完成分布式流式应用程序的开发,而其他比较复杂的工作,如节点间的通信、节点失效、数据分片、系统扩展等,全部由运行时环境完成,用户无需关心这些细节。
当前比较主流的流式数据线(Data Pipeline)共分为四步:
(1)数据采集:该阶段主要负责从不同的数据源上实时采集数据,典型的数据源包括移动客户端,网站后端等,通常根据后端数据缓存模块不同,选用不同的实现方案,可选的包括Flume以及自定义Kafka Producer。
(2)数据缓冲:为了平衡数据采集和数据处理速率的不对等,通常数据采集阶段和处理阶段之间加入一个数据缓冲阶段,通过由消息队列担任该角色,比如:Kafka
(3)实时分析:流式地从数据缓冲区获取数据,并快速完成数据处理,将结果写到后端的存储系统中。根据系统对延迟和吞吐率的要求不同,可选用不同的流式计算引擎,比如Storm或SparkStreaming。
(4)结果存储:将计算产生的结果存储到外存储系统中,根据应用场景不同,可选择不同的存储系统,比如大量可实时查询的系统,可存储到HBase中,小量但需可高并发查询的系统,可存入Redis中。
根据流式计算引擎的数据组织特点,可将其分为两类:基于行(row based)和基于微批处理(micro-batch based)。基于行的流式实时处理系统以行为单位处理数据,其主要优点是单条数据的处理延迟低,但系统吞吐率一般也较低,其典型代表是Apache Storm;基于微批处理的流式实时处理系统则将流式处理转化为批处理,即以批为单位组织数据,它通常以时间为单位将流式数据切割成连续的批数据,并通过批处理的方式处理每批数据,这类系统的优点是吞吐率高,而缺点也很明显:单条数据处理延迟较高,其典型代表是Spark Streaming。
本节将介绍Storm的基本概念、软件架构、程序设计方法以及内部原理。
本小节将介绍Storm基本概念,包括:Tuple、Stream、Topology、Bolt和Spout。
1. 概念
Storm中核心概念如下:
在一个Topology中,每个Spout或Blot通常由多个Task组成,每个Spout和Blot的Task相互独立,可以并行执行。如图13-4所示,可类比MapReduce中的job:一个MapReduce Job可看作一个两阶段的DAG,其中Map阶段可分解成多个Map Task, Reduce阶段可分解成多个Reduce Task,相比之下,Storm Topology是一个更加通用的DAG,可以有多个Spout和Blot阶段,每个阶段可进一步分解成多个Task。
图13-4 Storm的Topology构成
Streaming Grouping:Stream Grouping决定了Topology中Tuple在不同Task之间的传递方式。Storm主要提供了多种Stream Grouping实现,常用的有:
(1)Shuffle Grouping:随机化的轮询方式,即Task产生的Tuple将采用轮询方式发送给下一类组件的Task;
(2)LoadOrShuffle Grouping:经优化的Shuffle Grouping实现,它使得同一Worker内部的Task优先将Tuple传递给同Worker的其他Task。
(3)Fields Grouping:某个字段值相同的Tuple将被发送给同一个Task,类似于MapReduce或Spark中的Shuffle实现。
2. Storm基本架构
一个Storm集群由三类组件构成:Nimbus、Supervistor和ZooKeeper。,如图13-5所示,它们的功能如下:
图13-5 Storm架构
3. Topology并发度
一个Storm的Topology的并发度与Worker、Executor和Task三种实体的数目相关,用户可根据需要为Topology定制每种实体的数目。需要注意的是,这些实体的并发度也被称为“parallelism hint”,它们的数值只是初始值,而后续可根据需求进一步进行调整。三种实体的并发度设置方式具体如下:
以下代码创建了一个Storm Topology,包含KafkaSpout、SplitBolt以及MergeBolt三类组件,并为每类组件设置了Executor和Task数目,该代码对应的Topology运行时环境如图13-6所示。
Config conf = new Config();
conf.setNumWorkers(2);
topologyBuilder.setSqout("kafka-spout",new KafkaSpout(), 2);
topologyBuilder.setBolt("split-bolt", new SplitBolt(), 2)// 为SplitBolt启动两个Executor线程
.setNumTasks(4) // 设置Task数目为4
.shuffleGrouping("kafka-spout");
topologyBuilder.setBolt("merge-bolt", new MergeBolt(), 6)// 为SplitBolt启动6个Executor线程
.shuffleGrouping("split-bolt");
StormSubmitter.submitToplogy("mytopologgy", conf, topologgyBuilder.createTopology());
图13-6 一个Storm Topology的运行时环境
一旦Topology运行起来后,用户可通过Web UI或Shell命令动态修改Topology的并发度,比如以下Shell命令将以上Topology的Worker数目增大为4, kafka-spout Executor数目增大为4,merge-bolt Executor数目增大为8:
storm rebalance mytopology -n 4 -e kafka-spout=4 -e merge-bolt=8
本节将介绍简化版的Storm程序设计实例:网站指标实时分析系统。在该系统中,用户行为数据(日志)被源源不断地发送到Kafka集群中,之后经Storm Topology处理后,写入HBase中,以供可视化模块展示实时统计系统,包括网站的PⅤ(Page Ⅴiew)和UⅤ(Unique visitor)等信息。
在该系统中,我们采用JSON格式保存用户访问日志,每条日志包含三个字段,分别是客户端地址(ip)、访问时间(timestamp),访问的链接(url),以下是几条日志数据的示例:
{"ip":"10.10.10.1", "timestamp":"20170822132730", "url":"http://dongxicheng.org/mapreduce-nextgen/voidbox-docker-on-hadoop-hulu"}
{"ip":"112.156.10.1", "timestamp":"20170822132730", "url":"http://dongxicheng.org/framework-on-yarn/hadoop-spark-common-parameters/"}
{"ip":"150.110.103.5", "timestamp":"20170822132732","url":"http://dongxicheng.org/mapreduce-nextgen/yarn-mesos-borg/"}
为了实现网站指标的实时统计,我们可以设计如图13-7所示的Storm Topology。
图13-7 网站指标实时分析系统的Storm Topology
该Topology包含1个Spout和3个Bolt,它们的作用如下:
创建Topology主要代码如下:
TopologyBuilder builder = new TopologyBuilder();
// 定义时间窗口大小
BaseWindowedBolt.Duration duration = BaseWindowedBolt.Duration.minutes(10);
// 构建Topology,依次设置KafkaSpout、ParseBolt、ComputeBolt、HBaseBolt
builder.setSpout("kafka-bolt", new KafkaBolt(spoutCOnf), 1)
builder.setBolt("parse-bolt", new ParseBolt(), 4).setShuffleGrouping("kafka-spout");
builder.setBolt("calculate-bolt", new CalculateBolt().withWindow(duration,duration), 1).setShuffleGrouping("parse-blot");
builder.setBolt("hbase-bolt", new HbaseBolt(), 1).setShuffleGrouping("calculate-bolt");
StormSubmitter.submitTopology("StatisticsTopology", conf, builder-createTopogy());
ParseBolt代码如下:
public class ParseBolt extends BaseBasicBolt {
@Override
public void prepare(Map map, TopologyContext topologyContext) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
byte[] buffer = (byte[]) tuple.getValueByField("bytes");
String strs = new String(buffer);
// 从每个Json对象中解析出ip、url和timestamp三个字段
JSONObject json = JSON.parseObject(strs);
String ip = (String) json.get("ip");
String url = (String) json.get("url");
String timestamp = (String) json.get("timestamp");
collector.emit(new Values(url, ip, timestamp));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("url", "ip", "timestamp"));
}
}
Storm Topology是由一个Spout和多个Blot,每个Spout或Blot可通过任务并行化的方式运行在集群中。为了保证Topology可靠地运行在集群中,Storm提供了一整套分布式运行时环境,该环境由Nimbus、Supervisor和Zookeeper等组件构成。
1. Topology生命周期
如图13-8所示,可类比MapReduce Job学习Storm Topology:MapReduce Job分为Map和Reduce两个阶段,这两个阶段均通过任务并行化的方式运行,其中Map阶段启动多个MapTask, Reduce阶段启动多个Reduce Task, Reduce Task依赖于Map Task输出结果,但同类Task之间是彼此独立的。对于Storm Topology而言,它通常有一个Spout和多个Blot阶段构成,这些阶段存在数据依赖关系,进而形成一个DAG,也是通过任务并行化的方式运行,各个阶段均可以启动多个独立的Task并执行。
图13-8 MapReduce与Storm对比
如图13-9所示,Storm Topology从提交到运行,依次经历以下几个步骤:
2. Storm运行时环境
一个Storm集群是由Nimbus、Supervisor和ZooKeeper三类组件构成,其中Nimbus负责调度和容错,Supervisor负责启动实际的计算任务,而Zookeeper是Nimbus和Supervisor之间的调度者。
(1) Nimbus
Nimbus扮演master角色,每个Storm集群只有一个,基本职责包括:
(2) Zookeeper
Zookeeper是Nimbus和Supervisor之间的协调者,它的引入使得Nimbus和Supervisor之间解耦,由于Storm集群中所有状态均可靠地保存在ZooKeeper上,这使得Nimbus自身变成无状态,以上设计使得Storm集群的容错性和鲁棒性很好。
在Storm中,Zookeeper数据组织方式如图13-10所示
内容来源于网络,如有侵权,请联系作者删除!