1**、Storm的安装和配置**
Storm主要有两种安装模式,分别是伪分布模式(也叫单机模式)和全分布模式(也叫集群模式),这里需要注意的是,Storm的运行依赖于Zookeeper,因此在安装Storm之前,要先安装好Zookeeper。前面已经在主机hadoop221上安装好了单机版的Zookeeper,在主机hadoop222、hadoop223和hadoop224上安装好了全分布版的Zookeeper,本文就基于这个前提条件,分别介绍Storm的伪分布模式和全分布模式的安装和配置。
A**、Storm伪分布模式**
Storm伪分布模式仅需要一台机器,提供的基本功能跟全分布式模式类似,一般用于开发和测试,下面就介绍如何在主机hadoop221上安装和配置伪分布模式的Storm。
首先,将Storm安装包apache-storm-1.0.3.tar.gz上传到/root/tools目录下,运行命令tar -zxvf apache-storm-1.0.3.tar.gz-C /root/training/,将Storm解压到/root/training目录下;然后,配置Storm的环境变量,运行命令vi /root/.bash_profile,在文件末尾添加如下代码:
STORM_HOME=/root/training/apache-storm-1.0.3
export STORM_HOME
PATH=$STORM_HOME/bin:$PATH
export PATH
保存退出,并运行命令source /root/.bash_profile,使环境变量生效;接下来需要配置Storm的核心配置文件storm.yaml,进入到/root/training/apache-storm-1.0.3/conf目录下,运行命令vi storm.yaml,编辑storm.yaml文件,修改如下:
/#配置Zookeeper地址,注意“-”后有一个空格
storm.zookeeper.servers:
-"hadoop221"
/#配置主节点地址,注意“:”后有一个空格
nimbus.seeds: ["hadoop221"]
/#增加如下一行代码,设置Storm客户端提交任务的jar包所保存的位置目录
storm.local.dir:"/root/training/apache-storm-1.0.3/tmp"
/#增加如下几行代码,配置每个从节点上运行worker的个数及对应的端口号
supervisor.slots.ports:
6700
6701
6702
6703
/#增加如下一行代码,设置启用Event Logger,可以用于打开Debug,查看处理的数据
"topology.eventlogger.executors": 1
保存退出,运行命令mkdir/root/training/apache-storm-1.0.3/tmp,创建tmp目录。至此,Storm的伪分布模式安装配置完成。在启动Storm前,先确保Hadoop和Zookeeper都已经正常运行,然后运行命令storm nimbus &,以后台方式启动Storm主节点Nimbus;运行命令storm supervisor &,以后台方式启动Storm从节点Supervisor;运行命令storm ui &,以后台方式启动Storm网页;在浏览器地址栏中输入http://192.168.12.221:8080,可以看到如下界面:
B**、Storm全分布模式**
Storm的全分布模式,也叫作集群模式,这是生产环境中经常使用的一种安装模式,需要在多台机器上进行安装和配置。下面就介绍在主机hadoop222(安装主节点Nimbus)、hadoop223(安装从节点Supervisor)和hadoop224(安装从节点Supervisor)上安装Storm全分布模式。
在主机hadoop222上解压Storm安装包,然后在三台主机上均配置好Storm的环境变量,这些操作跟前面完全一样,这里不做赘述。然后,配置Storm的核心配置文件storm.yaml,修改如下:
/#配置Zookeeper集群的地址
storm.zookeeper.servers:
-"hadoop222"
-"hadoop223"
-"hadoop224"
/#配置Storm主节点Nimbus的地址
nimbus.seeds: ["hadoop222"]
/#增加如下一行代码,设置Storm客户端提交任务的jar包所保存的位置目录
storm.local.dir:"/root/training/apache-storm-1.0.3/tmp"
/#增加如下几行代码,配置每个从节点上运行worker的个数及对应的端口号
supervisor.slots.ports:
6700
6701
6702
6703
/#增加如下一行代码,设置启用Event Logger,可以用于打开Debug,查看处理的数据
"topology.eventlogger.executors": 1
保存退出,运行命令mkdir/root/training/apache-storm-1.0.3/tmp,创建tmp目录;然后,分别运行命令scp -r/root/training/apache-storm-1.0.3/ root@hadoop223:/root/training/,scp -r /root/training/apache-storm-1.0.3/root@hadoop224:/root/training/,将配置好的Storm通过网络拷贝到主机hadoop223和主机hadoop224的/root/training/目录上,这样相当于主机hadoop223和hadoop224也安装配置好了Storm。如此,Storm的全分布模式就算安装完成了。
下面验证Storm是否安装成功。在启动Storm相应组件之前,确保Hadoop和Zookeeper正常启动。然后,在hadoop222上依次运行storm nimbus &,storm ui &;在从节点hadoop223和从节点hadoop224上运行storm supervisor &。在浏览器地址栏中输入http://192.168.12.222:8080,可以看到如下界面:
C**、Storm HA模式的配置**
Storm HA(High Availability)模式是为了增强Storm集群的可用性,通过为Storm集群配置多个Nimbus主节点实现,当正在运行的主节点意外宕机后,备用的主节点可以立刻接管工作,保证Storm集群正常工作。
前面已经安装配置好了Storm的全分布模式,在这个基础之上,再来安装Storm的HA模式,非常简单,只需要在配置文件中稍做修改即可。停止Storm集群的运行,然后编辑三台主机上Storm的核心配置文件storm.yaml,将nimbus.seeds:["hadoop222"]修改为nimbus.seeds: ["hadoop222","hadoop223"],这样将在主机hadoop223上也运行Storm的一个主节点Nimbus。
最后,启动Storm集群。在主机hadoop222上依次运行storm nimbus &、storm ui &、storm logviewer &(以后台方式启动日志查看器);在主机hadoop223上依次运行storm nimbus &、storm supervisor &、storm ui &以及storm logviewer &;在主机hadoop224上依次运行storm supervisor &和storm logviewer &。在浏览器地址栏中输入http://192.168.12.222:8080,可以看到如下界面:
2**、Storm Demo的运行**
在介绍运行Storm Demo示例之前,先简单介绍Storm的常用命令。在Storm中有许多简单且有用的命令可以用来管理拓扑,它们可以提交、杀死、禁用、再平衡拓扑等。
提交任务的命令格式为:storm jar【jar路径】【拓扑包名.拓扑类名】【拓扑名称】,如storm jar storm-starter-topologies-1.0.3.jarorg.apache.storm.starter.WordCountTopology MyWorcCountTest;
*
杀死Storm任务的命令格式为:storm kill【拓扑名称】-w 10,如storm kill topologyName –w10;
*
停用Storm任务的命令格式为:storm deactive【拓扑名称】,如storm deactive topologyName;
*
启动Storm任务的命令格式为:storm activate【拓扑名称】,如storm activate topologyName;
*
重新部署Storm任务的命令格式为:storm rebalance【拓扑名称】,如storm rebalance topologyName;
再平衡使你重分配Storm集群任务,这是个非常强大的命令,比如,你向一个运行中的集群增加了节点,再平衡命令将会停用拓扑,然后在相应超时时间之后重分配Worker,并重启拓扑。
在Storm的安装目录/root/training/apache-storm-1.0.3/examples/storm-starter下,storm-starter-topologies-1.0.3.jar包中包含有多个Storm程序实例,具体使用方法可以参考该目录下README.markdown文件中的说明。Storm示例中也提供了程序世界中的HelloWorld例子,下面就在伪分布模式下(主机hadoop221上)来运行该实例。
进入到/root/training/apache-storm-1.0.3/examples/storm-starter目录下,运行命令storm jar storm-starter-topologies-1.0.3.jarorg.apache.storm.starter.WordCountTopology MyWorcCountTest,然后在浏览器地址栏中输入http://192.168.12.221:8080,可以看到如下界面:
点击Topology Summary下的MyWordCountTest(这是在命令行中自定义的Storm任务名称),可以看到如下界面:
点击Topology actions下面的Debug(打开Debug模式),会弹出一个设置对话框,其中的数字表示采样比率(数值越大在相同时间内查看到的数据就越多,比如50,表示每采集100条数据显示50条),可以自己随机设定,这里就设置为50;然后,可以点击Spouts(All time)下面的spout或者Bolts(All time)下面的count或split,这里点击spout,看到如下界面:
再点击Component summary下面的events,可以看到如下界面。需要注意的是,查看完后需要将Debug开关关闭,否则会一直采集数据占用掉大量的存储空间,并将任务kill掉。
3**、Storm内部通信机制**
同一个Worker间消息的发送使用的是LMAX Disruptor,它负责同一个节点(同一个进程内)上线程间的通信;Disruptor使用了一个RingBuffer替代队列,用生产者消费者指针替代锁;生产者消费者指针使用CPU支持的整数自增,无需加锁并且速度很快,Java的实现在Unsafe package中;不同Worker间通信使用ZeroMQ(0.8版本)或者Netty(0.9.0版本);不同Topology之间的通信,Storm不负责,需要开发者自己想办法进行实现,例如可以使用Kafka等。
Storm中Worker进程内部的结构如下图所示:
每一个Worker进程都有一个单独的线程来监听该Worker的端口号,并接收发送到该端口的数据,它将通过网络发送过来的数据放到Worker的接收队列里面。监听的端口号是通过supervisor.slots.ports定义(conf/storm.yaml中进行配置),每个节点配置几个端口号就可以有几个Worker。与通信相关的几个配置项介绍如下:
supervisor.slots.ports.worker:Worker进程的接收线程的监听端口号;
*
topology.receiver.buffer.size:Worker进程中接收线程缓存消息的大小,它将该缓存消息发送给executor线程,需要为2的倍数;
*
topology.transfer.buffer.size:Worker进程中向外发送消息的缓存大小;
*
topology.executor.receive.buffer.size:executor线程的接收队列大小,需要为2的倍数;
*
topology.executor.send.buffer.size:executor线程的发送队列大小,需要为2的倍数
4**、Storm编程模型**
在一个Storm程序中,其逻辑处理组件主要包含两个,即Spout和Bolt,它们之间的结构如下图所示:
除此之外,在Storm程序中,通常还涉及其他一些成员,总结如下:
Topology:Storm中运行的一个实时应用程序(拓扑),通常也叫Storm任务(Job);
*
Spout:在一个Topology中获取数据流的组件。通常情况下Spout会从外部数据源(如Kafka等)中获取数据,然后转换为Topology内部的源数据Tuple,一般情况下,使用Storm提供的Spout就够用了;
*
Bolt:接收Tuple数据,然后执行处理的组件,用户可以在其中执行自己的逻辑处理操作,也就是说业务逻辑的处理在这个组件中;
*
Tuple:Storm中处理数据的基本单元,其Schema定义字段名称和顺序,可以理解为一组数据就是一个Tuple;
*
Stream:Tuple的管道(类似Unix管道),表示数据的流向;一个Stream中的Tuple有固定的Schema,每个Spout、Bolt都有一个默认的Stream,Spout、Bolt可以有多个Stream;
*
Grouping:数据分组策略,定义一个Tuple数据如何发送给下一个Bolt组件,主要包括ShuffleGrouping(随机分组)、FieldsGrouping(按字段分组,按数据中field值进行分组,相同field值的Tuple被发送到相同的Task)、DirectGrouping(直接分组,需要指定TaskId)、AllGrouping(广播分组)以及NoneGrouping(不分组),后文将对这些常用的分组策略进行详细的介绍。
5**、Storm数据分组策略**
所谓的Grouping策略,其实就是在Spout组件和Bolt组件之间以及Bolt组件与Bolt组件之间传递Tuple的方式,总共有8种方式,总结如下:
ShuffleGrouping(随机分组):随机分发Tuple给Bolt组件对应的任务,能够保证各Task中处理的数据均衡;
*
FieldsGrouping(按字段分组):根据指定的字段,具有相同值的Tuple被分配到同一个Bolt进行处理,比如按“user-id”这个字段来分组,那么具有相同“user-id”值的Tuple会被分配到相同的Bolt组件里的一个Task,而具有不同“user-id”值的Tuple则会被分配到不同的Bolt中的Task;
*
DirectGrouping(直接分组):指向型分组,由Tuple的发射者直接决定Tuple将发射给哪个Bolt,一般情况下是由接收Tuple的Bolt决定接收哪个Bolt发射的Tuple。这是一种比较特殊的分组方法,使用这种分组意味着Tuple的发送者指定由Tuple接收者的某个Task来进行处理。只有被声明为DirectStream的消息流可以声明这种分组方法,而且这种Tuple必须使用emitDirect方法来发射,Tuple处理者可以通过TopologyContext来获取处理它的Task的id(OutputCollector.emit方法也会返回Task的id);
*
AllGrouping(广播分组):Tuple被复制分发到所有Bolt,即所有Bolt组件都可以收到该Tuple,这种分组方式需要谨慎使用;
*
GlobalGrouping(全局分组):全部Tuple都被分发给Bolt组件的同一个任务Task,明确地说,是分发给ID最小的那个Task;
*
NoneGrouping(不分组):这个分组的意思是指Stream不关心到底怎样分组,目前这种分组和ShuffleGrouping是一样的效果,有一点不同的是,Storm会把使用NoneGrouping的这个Bolt放到其订阅者同一个线程中去执行;
*
LocalOrShuffleGrouping(本地或随机分组):如果目标Bolt有一个或者多个Task与源Bolt的Task在同一个工作进程中,Tuple将会被随机发送给这些同进程中的Task,否则,就和普通的ShuffleGrouping分组一样;
*
CustomGrouping(自定义分组):如同MapReduce中自己去实现一个Partition一样,用户可以自定义分组策略。
6**、Storm编程实例**
下面,介绍如何编写自己的Storm程序实例WordCount。该程序包括四个组件,每个组件对应一个Java类,各自有其特定的功能作用。
WordCountSpout组件类(负责获取数据)源代码如下图所示:
WordCountSplitBolt组件类(负责拆分单词)源代码如下图所示:
WordCountTotalBolt组件类源代码如下图所示:
WordCountTopology主程序类源代码如下图所示:
最终,该程序在Eclipse中运行结果如下图所示。可以看到,每隔3秒钟,程序会随机采集一条数据,然后对该条数据进行单词拆分,最后进行单词次数统计并将结果输出显示到屏幕。
当然了,可以对上述程序稍加修改,然后导出jar包放到Storm集群上进行运行,运行方式与前面Storm自带WordCount示例的运行方式类似,主程序代码如下图所示:
7**、Storm本地和Storm Zookeeper目录树**
前面安装Storm时,在Storm的核心配置文件中定义了storm.local.dir:"/root/training/apache-storm-1.0.3/tmp",当启动Storm并运行了相应的任务,便会将相关的一些信息保存到该tmp目录下,简单介绍tmp的目录树结构如下图所示:
同样地,当提交了一个Storm任务到Storm集群上运行时,也会保存Storm的节点及相关任务信息到Zookeeper中,简单介绍Storm Zookeeper的目录树结构如下图所示:
8**、Storm任务提交的过程**
Storm任务提交的详细过程如下图所示:
这里对Storm任务提交的过程进行简单总结如下:
客户端提交Topology到Nimbus(提交的jar包被上传到Nimbus下的inbox目录);
1.
jar包中的submitTopology方法会对Topology进行一些检查处理(如Bolt/Spout的id是否违法,Storm是否是active等),然后在Nimbus服务器上建立Topology本地目录进行存储(包含Topology的jar包以及Topology的序列化对象);
1.
之后Nimbus进行任务分配(根据Topology定义的一些参数来对Bolt/Spot设定Task的数量并分配对应的task-Id),将分配好的Task信息(Task信息包括Task的心跳信息,Topology的描述信息等)发送到Zookeeper对应的目录下;
1.
Supervisor定期到Zookeeper相应目录下查看是否有新的任务,有的话下载下来,根据任务的描述信息启动相应的Worker进行工作;
1.
Worker根据任务的描述信息创建相应的网络连接来发送消息。
9**、Storm与外部系统组件的集成**
在Storm的安装目录/root/training/apache-storm-1.0.3/external下,可以看到如下图所示的内容,这些目录下存放的是Storm与外部系统集成所需要的jar包。
下面以Storm与HBase的集成为例,在前面WordCount程序的基础上,介绍具体的实现方法。WordCountSpout、WordCountSplitBolt以及WordCountTotalBolt三个Java类保持不变,开发的第三级Bolt组件类(负责将数据保存到HBase表中)代码如下图所示:
Storm与HBase集成的WordCount程序的主程序类源代码如下图所示:
在运行程序前,需要在HBase中创建result表(带有列族info),命令为create 'result','info',并查询result表结果如下图所示,可以看到此时result表中没有任何数据。
等待程序执行几秒钟后,再查看result表,如下图所示
再等待几秒钟后,查看result表,如下图所示。可以看到,随着程序的执行,单词出现次数的累加结果被更新并存储到了HBase的result表中。
这里仅介绍Storm与HBase的集成,但Storm可与外部系统非常多的组件进行集成,如JDBC、Redis、HDFS、Kafka、Hive以及JMS等等,有兴趣的朋友可以自行再去学习,这里不再介绍。至此,Storm的实战介绍就告一段落了,下期再见。
参考文献:
——《实时大数据分析 基于Storm、Spark技术的实时应用》
——《CSDN博客》
——《潭州大数据课程课件》
转自https://mp.weixin.qq.com/s/rkMKv4UwLL5hY5JXHhJGYA
内容来源于网络,如有侵权,请联系作者删除!