海量日志采集工具——Flume

x33g5p2x  于2022-06-27 转载在 Flume  
字(24.3k)|赞(0)|评价(0)|浏览(930)

海量日志采集工具——Flume

一、Flume的简介

1.1、大数据处理流程

在企业中,大数据的处理流程一般是:

  1. 数据采集
  2. 数据存储
  3. 数据清洗
  4. 数据分析
  5. 数据展示

参考下图:

在数据采集和搜集的工具中,Flume框架占有一定的市场份量。

1.2、Flume的简介

Flume 是一种分布式、可靠且可用的服务,用于高效收集、聚合和移动大量日志数据。它具有基于流数据流的简单灵活的架构。它具有可调整的可靠性机制以及许多故障转移和恢复机制,具有健壮性和容错性。它使用允许在线分析应用程序的简单可扩展数据模型。
参考官网: http://flume.apache.org/

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

flume 最开始是由 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。但随着 flume 功能的扩展,flume的代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点渐渐暴露出来,尤其是在发行版本 0.9.4中,日志传输不稳定的现象尤为严重。

为了解决这些问题,2011 年 10 月 22 号,cloudera 对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,并将 Flume 纳入 apache 旗下,从cloudera Flume 改名为 Apache Flume。

1.3、版本区别

​ 为了与之前版本区分开,重构后的版本统称为 Flume NG(next generation),重构前的版本被统称为 Flume OG(original generation),Flume目前只有Linux系统的启动脚本,没有Windows环境的启动脚本。

二、Flume的体系结构

2.1、体系结构简介

Flume 运行的核心是 Agent。Flume是以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方。如下图所示:

2.2、组件及其作用

  1. - Client
  2. 客户端,Client生产数据,运行在一个独立的线程中
  3. - Event
  4. 一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)
  5. - Flow
  6. Event从源点到达目的点的迁移的抽象。
  7. - Agent
  8. 一个独立的Flume进程,运行在JVM中,包含组件Source Channel Sink
  9. 每台机器运行一个agent,但是一个agent中可以包含多个sourcessinks
  10. - Source
  11. 数据收集组件。sourceClient收集数据,传递给Channel
  12. - Channel
  13. 管道,负责接收source端的数据,然后将数据推送到sink端。
  14. - Sink
  15. 负责从channel端拉取数据,并将其推送到持久化系统或者是下一个Agent
  16. - selector
  17. 选择器,作用于source端,然后决定数据发往哪个目标。
  18. - interceptor
  19. 拦截器,flume允许使用拦截器拦截数据。允许使用拦截器链,作用于sourcesink阶段。

三、Flume的安装

3.1、安装和配置环境变量

3.1.1、准备软件包

将apache-flume-1.8.0-bin.tar.gz 上传到linux系统中的/root/soft/目录中

3.1.2、解压软件包

  1. [root@master soft]# pwd
  2. /root/soft
  3. [root@master soft]# tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local/

3.1.3、更名操作

  1. [root@master soft]# cd /usr/local/
  2. [root@master local]# mv apache-flume-1.8.0-bin/ flume

3.1.4、配置环境变量

  1. [root@master local]# vi /etc/profile
  2. ........省略..........
  3. export FLUME_HOME=/usr/local/flume
  4. export PATH=$FLUME_HOME/bin:$PATH
  5. # 加载环境变量
  6. [root@master apps]# source /etc/profile

3.1.5、验证环境变量

  1. [root@master local]# flume-ng version
  2. Flume 1.8.0
  3. Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
  4. Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f
  5. Compiled by denes on Fri Sep 15 14:58:00 CEST 2017
  6. From source with checksum fbb44c8c8fb63a49be0a59e27316833d

3.2、配置文件

  1. [root@master local]# cd flume/conf/
  2. [root@master conf]# ll #查看里面是否有一个flume-env.sh.template文件
  3. [root@master conf]# cp flume-env.sh.template flume-env.sh
  4. [root@master conf]# vim flume-env.sh
  5. ........省略..........
  6. export JAVA_HOME=/usr/local/jdk
  7. ........省略..........

四、Flume的部署

4.1、数据模型

  • 单一数据模型
  • 多数据流模型
4.1.1、单一数据模型

在单个 Agent 内由单个 Source, Channel, Sink 建立一个单一的数据流模型,如下图所示,整个数据流为

Web Server --> Source --> Channel --> Sink --> HDFS。

4.1.2、多数据流模型

**1)**多 Agent 串行传输数据流模型

**2)**多 Agent 汇聚数据流模型

**3)**单 Agent 多路数据流模型

**4)**Sinkgroups 数据流模型

4.1.3、小总结
  1. flume提供的数据流模型中,几个原则很重要。
  2. Source--> Channel
  3. 1.单个Source组件可以和多个Channel组合建立数据流,既可以replicating  multiplexing
  4. 2.多个Sources可以写入单个 Channel
  5. Channel-->Sink
  6. 1.多个Sinks又可以组合成SinkgroupsChannel中获取数据,既可以loadbalancingfailover机制。
  7. 2.多个Sinks也可以从单个Channel中取数据。
  8. 3.单个Sink只能从单个Channel中取数据
  9. 根据上述 5 个原则,你可以设计出满足你需求的数据流模型。

4.2、配置介绍

4.2.1、定义组件名称

要定义单个代理中的流,您需要通过通道链接源和接收器。您需要列出给定代理的源,接收器和通道,然后将源和接收器指向一个通道。一个源实例可以指定多个通道,但是一个接收器实例只能指定一个通道。格式如下:

  1. # list the sources, sinks and channels for the agent
  2. <Agent>.sources = <Source>
  3. <Agent>.sinks = <Sink>
  4. <Agent>.channels = <Channel1> <Channel2>
  5. # set channel for source
  6. <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
  7. # set channel for sink
  8. <Agent>.sinks.<Sink>.channel = <Channel1>

案例如下:

  1. # list the sources, sinks and channels for the agent
  2. agent_foo.sources = avro-appserver-src-1
  3. agent_foo.sinks = hdfs-sink-1
  4. agent_foo.channels = mem-channel-1
  5. # set channel for source
  6. agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
  7. # set channel for sink
  8. agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
4.2.2、配置组件属性
  1. # properties for sources
  2. <Agent>.sources.<Source>.<someProperty> = <someValue>
  3. # properties for channels
  4. <Agent>.channel.<Channel>.<someProperty> = <someValue>
  5. # properties for sinks
  6. <Agent>.sources.<Sink>.<someProperty> = <someValue>

案例如下:

  1. agent_foo.sources = avro-AppSrv-source
  2. agent_foo.sinks = hdfs-Cluster1-sink
  3. agent_foo.channels = mem-channel-1
  4. # set channel for sources, sinks
  5. # properties of avro-AppSrv-source
  6. agent_foo.sources.avro-AppSrv-source.type = avro
  7. agent_foo.sources.avro-AppSrv-source.bind = localhost
  8. agent_foo.sources.avro-AppSrv-source.port = 10000
  9. # properties of mem-channel-1
  10. agent_foo.channels.mem-channel-1.type = memory
  11. agent_foo.channels.mem-channel-1.capacity = 1000
  12. agent_foo.channels.mem-channel-1.transactionCapacity = 100
  13. # properties of hdfs-Cluster1-sink
  14. agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
  15. agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
  16. #...

4.3、常用的source和sink种类

4.3.1、常用的flume sources
  1. # Avro source:
  2. avro
  3. # Syslog TCP source:
  4. syslogtcp
  5. # Syslog UDP Source:
  6. syslogudp
  7. # HTTP Source:
  8. http
  9. # Exec source:
  10. exec
  11. # JMS source:
  12. jms
  13. # Thrift source:
  14. thrift
  15. # Spooling directory source:
  16. spooldir
  17. # Kafka source:
  18. org.apache.flume.source.kafka,KafkaSource
  19. .....
4.3.2、常用的flume channels
  1. # Memory Channel
  2. memory
  3. # JDBC Channel
  4. jdbc
  5. # Kafka Channel
  6. org.apache.flume.channel.kafka.KafkaChannel
  7. # File Channel
  8. file
4.3.3、常用的flume sinks
  1. # HDFS Sink
  2. hdfs
  3. # HIVE Sink
  4. hive
  5. # Logger Sink
  6. logger
  7. # Avro Sink
  8. avro
  9. # Kafka Sink
  10. org.apache.flume.sink.kafka.KafkaSink
  11. # Hbase Sink
  12. hbase

五、案例演示

配置可参考:https://flume.apache.org/releases/content/1.10.0/FlumeUserGuide.html

5.1、案例演示:avro+memory+logger

Avro Source:监听一个指定的Avro端口,通过Avro端口可以获取到Avro client发送过来的文件,即只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容,输出位置为Logger

5.1.1、编写采集方案

  1. [root@master flume]# mkdir flumeconf
  2. [root@master flume]# cd flumeconf
  3. [root@master flumeconf]# vi avro-logger.conf
  4. #定义各个组件的名字
  5. a1.sources=avro-sour1
  6. a1.channels=mem-chan1
  7. a1.sinks=logger-sink1
  8. #定义sources组件的相关属性
  9. a1.sources.avro-sour1.type=avro
  10. a1.sources.avro-sour1.bind=master
  11. a1.sources.avro-sour1.port=9999
  12. #定义channels组件的相关属性
  13. a1.channels.mem-chan1.type=memory
  14. #定义sinks组件的相关属性
  15. a1.sinks.logger-sink1.type=logger
  16. a1.sinks.logger-sink1.maxBytesToLog=100
  17. #组件之间进行绑定
  18. a1.sources.avro-sour1.channels=mem-chan1
  19. a1.sinks.logger-sink1.channel=mem-chan1

5.1.2、启动Agent

  1. [root@master flumeconf]# flume-ng agent -c ../conf -f ./avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

5.1.3、测试数据

另起一个窗口测试

  1. [root@master ~]# mkdir flumedata
  2. [root@master ~]# cd flumedata/
  3. [root@master flumedata]#
  4. [root@master flumedata]# date >> test.data
  5. [root@master flumedata]# cat test.data
  6. 2019 11 21 星期四 21:22:36 CST
  7. [root@master flumedata]# ping master >> test.data
  8. [root@master flumedata]# cat test.data
  9. ....省略....
  10. [root@master flumedata]# flume-ng avro-client -c /usr/local/flume-1.6.0/conf/ -H master -p 9999 -F ./test.dat

5.2、实时采集(监听文件):exec+memory+hdfs

Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
#常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容

memory:传输数据的Channel为Memory

hdfs 是输出目标为Hdfs

5.2.1、配置方案

  1. [root@master flumeconf]# vim exec-hdfs.conf
  2. # 定义sources的属性
  3. a1.sources=r1
  4. a1.sources.r1.type=exec
  5. a1.sources.r1.command=tail -F /root/flumedata/test.data
  6. # 定义sinks的属性
  7. a1.sinks=k1
  8. a1.sinks.k1.type=hdfs
  9. a1.sinks.k1.hdfs.path=hdfs://master:8020/flume/tailout/%y-%m-%d/%H%M/
  10. a1.sinks.k1.hdfs.filePrefix=events
  11. a1.sinks.k1.hdfs.round=true
  12. a1.sinks.k1.hdfs.roundValue=10
  13. a1.sinks.k1.hdfs.roundUnit=second
  14. a1.sinks.k1.hdfs.rollInterval=3
  15. a1.sinks.k1.hdfs.rollSize=20
  16. a1.sinks.k1.hdfs.rollCount=5
  17. a1.sinks.k1.hdfs.batchSize=1
  18. a1.sinks.k1.hdfs.useLocalTimeStamp=true
  19. a1.sinks.k1.hdfs.fileType=DataStream
  20. # 定义channels的属性
  21. a1.channels
  22. channels=c1
  23. a1.channels.c1.type=memory
  24. a1.channels.c1.capacity=1000
  25. a1.channels.c1.transactionCapacity=100
  26. # 组件之间的绑定
  27. a1.sources.r1.channels=c1
  28. a1.sinks.k1.channel=c1

5.2.2、启动Agent

  1. [root@master flumeconf]# flume-ng agent -c ../conf -f ./exec-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

5.2.3、测试数据

  1. [root@master flumedata]# ping master >> test.data

5.3、实时采集(监听目录):spool+ mem+logger

spool: Source来源于目录,有文件进入目录就摄取。

mem:通过内存来传输数据

logger:是传送数据到日志

5.3.1、配置方案

  1. [root@master flumeconf]# vi spool-logger.conf
  2. a1.sources = r1
  3. a1.channels = c1
  4. a1.sinks = s1
  5. a1.sources.r1.type=spooldir
  6. a1.sources.r1.spoolDir = /home/flume/spool
  7. a1.sources.r1.fileSuffix = .COMPLETED
  8. a1.sources.r1.deletePolicy=never
  9. a1.sources.r1.fileHeader=false
  10. a1.sources.r1.fileHeaderKey=file
  11. a1.sources.r1.basenameHeader=false
  12. a1.sources.r1.basenameHeaderKey=basename
  13. a1.sources.r1.batchSize=100
  14. a1.sources.r1.inputCharset=UTF-8
  15. a1.sources.r1.bufferMaxLines=1000
  16. a1.channels.c1.type=memory
  17. a1.sinks.s1.type=logger
  18. a1.sinks.s1.maxBytesToLog=100
  19. a1.sources.r1.channels=c1
  20. a1.sinks.s1.channel=c1

5.3.2、启动agent

  1. [root@master flumeconf]# flume-ng agent -c ../conf -f ./spool-logger.conf -n a1 -Dflume.root.logger=INFO,console

5.3.3、测试

  1. [root@master ~]# for i in `seq 1 10`; do echo $i >> /home/flume/spool/$i;done

5.4、案例演示:http+ mem+logger

http: 表示数据来源是http网络协议,一般接收的请求为get或post请求. 所有的http请求会通过插件格式的Handle转化为一个flume的Event数据.

mem:表示用内存传输通道

logger:表示输出格式为Logger格式

5.4.1、配置方案

  1. [root@master flumeconf]# vi http-logger.conf
  2. a1.sources = r1
  3. a1.channels = c1
  4. a1.sinks = s1
  5. a1.sources.r1.type=http
  6. a1.sources.r1.bind = master
  7. a1.sources.r1.port = 6666
  8. a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler
  9. a1.channels.c1.type=memory
  10. a1.sinks.s1.type=logger
  11. a1.sinks.s1.maxBytesToLog=16
  12. a1.sources.r1.channels=c1
  13. a1.sinks.s1.channel=c1

5.4.2、启动agent的服务

  1. [root@master ~]# flume-ng agent -c ../conf -f ./http-logger.conf -n a1 -Dflume.root.logger=INFO,console

5.4.3、测试

  1. [root@master ~]# curl -X POST -d '[{"headers":{"name":"zhangsan","pwd":"123456"},"body":"this is my content"}]' http://master:6666

六、拦截器的使用

在Flume运行过程中,Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。拦截器有下面几个特点:

  • 拦截器需要实现org.apache.flume.interceptor.Interceptor接口。
  • 拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。
  • 拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截。
  • 一个拦截器返回的事件列表被传递给链中的下一个拦截器。
  • 如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可。

6.1、常用拦截器:

  1. Timestamp Interceptor :时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多
  2. Host Interceptor:主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)
  3. Static Interceptor:静态拦截器,用于在events header中加入一组静态的key和value。

6.2、案例演示:Syslogtcp+mem+hdfs

通过时间拦截器,数据源为SyslogTcp,传送的通道模式是FileChannel,最后输出的目的地为HDFS

6.2.1 配置方案:

  1. [root@master flumeconf]# vi ts.conf
  2. a1.sources = r1
  3. a1.channels = c1
  4. a1.sinks = s1
  5. a1.sources.r1.type=syslogtcp
  6. a1.sources.r1.host=master
  7. a1.sources.r1.port=6666
  8. a1.sources.r1.interceptors=i1 i2 i3
  9. a1.sources.r1.interceptors.i1.type=timestamp
  10. a1.sources.r1.interceptors.i1.preserveExisting=false
  11. a1.sources.r1.interceptors.i2.type=host
  12. a1.sources.r1.interceptors.i2.preserveExisting=false
  13. a1.sources.r1.interceptors.i2.useIP=true
  14. a1.sources.r1.interceptors.i2.hostHeader=hostname
  15. a1.sources.r1.interceptors.i3.type=static
  16. a1.sources.r1.interceptors.i3.preserveExisting=false
  17. a1.sources.r1.interceptors.i3.key=hn
  18. a1.sources.r1.interceptors.i3.value=master
  19. a1.channels.c1.type=memory
  20. a1.sinks.s1.type=hdfs
  21. a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/%H%M
  22. a1.sinks.s1.hdfs.filePrefix=%{hostname}
  23. a1.sinks.s1.hdfs.fileSuffix=.log
  24. a1.sinks.s1.hdfs.inUseSuffix=.tmp
  25. a1.sinks.s1.hdfs.rollInterval=60
  26. a1.sinks.s1.hdfs.rollSize=1024
  27. a1.sinks.s1.hdfs.rollCount=10
  28. a1.sinks.s1.hdfs.idleTimeout=0
  29. a1.sinks.s1.hdfs.batchSize=100
  30. a1.sinks.s1.hdfs.fileType=DataStream
  31. a1.sinks.s1.hdfs.writeFormat=Text
  32. a1.sinks.s1.hdfs.round=true
  33. a1.sinks.s1.hdfs.roundValue=1
  34. a1.sinks.s1.hdfs.roundUnit=second
  35. a1.sinks.s1.hdfs.useLocalTimeStamp=true
  36. a1.sources.r1.channels=c1
  37. a1.sinks.s1.channel=c1

6.2.2 启动agent的服务:

  1. [root@master flumeconf]# flume-ng agent -c ../conf -f ./ts.conf -n a1 -Dflume.root.logger=INFO,console

6.2.3 测试:

  1. [root@master ~]# echo "hello world hello qiangeng" | nc master 6666

6.3、案例演示:regex+Syslogtcp+mem+hdfs

拦截器为正则表达式拦截器, 数据源为Syslogtcp格式,传送通道为MemChannel,最后传送的目的地是HDFS

6.3.1 配置方案

  1. [root@master flumeconf]# vi regex-ts.conf
  2. a1.sources = r1
  3. a1.channels = c1
  4. a1.sinks = s1
  5. a1.sources.r1.type=syslogtcp
  6. a1.sources.r1.host = master
  7. a1.sources.r1.port = 6666
  8. a1.sources.r1.interceptors=i1
  9. a1.sources.r1.interceptors.i1.type=regex_filter
  10. #不要加引号包裹正则
  11. a1.sources.r1.interceptors.i1.regex=^[0-9].*$
  12. a1.sources.r1.interceptors.i1.excludeEvents=false
  13. a1.channels.c1.type=memory
  14. a1.channels.c1.capacity=1000
  15. a1.channels.c1.transactionCapacity=100
  16. a1.channels.c1.keep-alive=3
  17. a1.channels.c1.byteCapacityBufferPercentage=20
  18. a1.channels.c1.byteCapacity=800000
  19. a1.sinks.s1.type=hdfs
  20. a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/%H%M
  21. a1.sinks.s1.hdfs.filePrefix=%{hostname}
  22. a1.sinks.s1.hdfs.fileSuffix=.log
  23. a1.sinks.s1.hdfs.inUseSuffix=.tmp
  24. a1.sinks.s1.hdfs.rollInterval=60
  25. a1.sinks.s1.hdfs.rollSize=1024
  26. a1.sinks.s1.hdfs.rollCount=10
  27. a1.sinks.s1.hdfs.idleTimeout=0
  28. a1.sinks.s1.hdfs.batchSize=100
  29. a1.sinks.s1.hdfs.fileType=DataStream
  30. a1.sinks.s1.hdfs.writeFormat=Text
  31. a1.sinks.s1.hdfs.round=true
  32. a1.sinks.s1.hdfs.roundValue=1
  33. a1.sinks.s1.hdfs.roundUnit=second
  34. a1.sinks.s1.hdfs.useLocalTimeStamp=true
  35. a1.sources.r1.channels=c1
  36. a1.sinks.s1.channel=c1

6.3.2、启动agent的服务:

  1. [root@master flumeconf]# flume-ng agent -c ../conf -f ./regex-ts.conf -n a1 -Dflume.root.logger=INFO,console

6.3.3、测试:

  1. [root@master ~]# echo "hello world hello qiangeng" | nc master 6666
  2. [root@master ~]# echo "123123123 hello world hello qiangeng" | nc master 6666

6.4、自定义拦截器

6.4.0 需求:

  1. 为了提高Flume的扩展性,用户可以自己定义一个拦截器
  2. eventbody中的数据, 以数字开头的, 存储为 hdfs://master:8020/flume/number.log s1
  3. eventbody中的数据, 以字母开头的, 存储为 hdfs://master:8020/flume/character.log s2
  4. eventbody中的数据, 其他的开头的, 存储为 hdfs://master:8020/flume/other.log s3

6.4.1 pom.xml

可以参考/code/pom.xml

  1. <dependencies>
  2. <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
  3. <dependency>
  4. <groupId>org.apache.flume</groupId>
  5. <artifactId>flume-ng-core</artifactId>
  6. <version>1.8.0</version>
  7. </dependency>
  8. </dependencies>

6.4.2 代码

具体代码可以参考 code/MyInterceptor

  1. public class MyInterceptor implements Interceptor {
  2. private static final String LOCATION_KEY = "location";
  3. private static final String LOCATION_NUM = "number";
  4. private static final String LOCATION_CHAR = "character";
  5. private static final String LOCATION_OTHER = "other";
  6. @Override
  7. public void initialize() {
  8. }
  9. /**
  10. * 当拦截到单个Event的时候调用
  11. * @param event 这个被拦截的Event
  12. * @return 这个拦截器发送出去的Event
  13. */
  14. @Override
  15. public Event intercept(Event event) {
  16. // 对拦截到的event的header中添加指定的键值对
  17. // 1. 获取拦截到的event的body
  18. byte[] body = event.getBody();
  19. // 2. 验证是否是以数字开头
  20. if (body[0] >= '0' && body[0] <= '9') {
  21. // 在这个event的头部添加一个键值对,来标识这个event需要放入到哪一个channel中
  22. event.getHeaders().put(LOCATION_KEY, LOCATION_NUM);
  23. }
  24. else if (body[0] >= 'a' && body[0] <= 'z' || body[0] >= 'A' && body[0] <= 'Z') {
  25. event.getHeaders().put(LOCATION_KEY, LOCATION_CHAR);
  26. }
  27. else {
  28. event.getHeaders().put(LOCATION_KEY, LOCATION_OTHER);
  29. }
  30. return event;
  31. }
  32. /**
  33. * 批量拦截到Event
  34. * @param list 存储Event的集合
  35. * @return 处理之后的Event集合
  36. */
  37. @Override
  38. public List<Event> intercept(List<Event> list) {
  39. for (Event event : list) {
  40. intercept(event);
  41. }
  42. return list;
  43. }
  44. @Override
  45. public void close() {
  46. }
  47. /**
  48. * 设计一个获取拦截器对象的类
  49. */
  50. public static class MyBuilder implements Builder {
  51. @Override
  52. public Interceptor build() {
  53. return new MyInterceptor();
  54. }
  55. @Override
  56. public void configure(Context context) {
  57. }
  58. }
  59. }

6.4.3 打包上传

  1. 使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到flume lib目录下

6.4.4 编写方案

  1. a1.sources=r1
  2. a1.channels=c1 c2 c3
  3. a1.sinks=s1 s2 s3
  4. a1.sources.r1.channels=c1 c2 c3
  5. a1.sinks.s1.channel=c1
  6. a1.sinks.s2.channel=c2
  7. a1.sinks.s3.channel=c3
  8. # 设置source的属性
  9. a1.sources.r1.type=syslogtcp
  10. a1.sources.r1.host=master
  11. a1.sources.r1.port=12345
  12. # 设置拦截器
  13. a1.sources.r1.interceptors=i1
  14. a1.sources.r1.interceptors.i1.type=com.qf.MyInterceptor$MyBuilder
  15. # 设置选择器的属性
  16. a1.sources.r1.selector.type=multiplexing
  17. a1.sources.r1.selector.header=location
  18. a1.sources.r1.selector.mapping.number=c1
  19. a1.sources.r1.selector.mapping.character=c2
  20. a1.sources.r1.selector.mapping.other=c3
  21. # 设置channel的属性
  22. a1.channels.c1.type=memory
  23. a1.channels.c1.capacity=1000
  24. a1.channels.c2.type=memory
  25. a1.channels.c2.capacity=1000
  26. a1.channels.c3.type=memory
  27. a1.channels.c3.capacity=1000
  28. # 设置sink的属性
  29. a1.sinks.s1.type=hdfs
  30. a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/customInterceptor/s1/%Y-%m-%d-%H
  31. a1.sinks.s1.hdfs.useLocalTimeStamp=true
  32. a1.sinks.s1.hdfs.filePrefix=regex
  33. a1.sinks.s1.hdfs.rollInterval=0
  34. a1.sinks.s1.hdfs.rollSize=102400
  35. a1.sinks.s1.hdfs.rollCount=30
  36. a1.sinks.s1.hdfs.fileType=DataStream
  37. a1.sinks.s1.hdfs.writeFormat=Text
  38. a1.sinks.s2.type=hdfs
  39. a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/customInterceptor/s2/%Y-%m-%d-%H
  40. a1.sinks.s2.hdfs.useLocalTimeStamp=true
  41. a1.sinks.s2.hdfs.filePrefix=regex
  42. a1.sinks.s2.hdfs.rollInterval=0
  43. a1.sinks.s2.hdfs.rollSize=102400
  44. a1.sinks.s2.hdfs.rollCount=30
  45. a1.sinks.s2.hdfs.fileType=DataStream
  46. a1.sinks.s2.hdfs.writeFormat=Text
  47. a1.sinks.s3.type=hdfs
  48. a1.sinks.s3.hdfs.path=hdfs://master:8020/flume/customInterceptor/s3/%Y-%m-%d-%H
  49. a1.sinks.s3.hdfs.useLocalTimeStamp=true
  50. a1.sinks.s3.hdfs.filePrefix=regex
  51. a1.sinks.s3.hdfs.rollInterval=0
  52. a1.sinks.s3.hdfs.rollSize=102400
  53. a1.sinks.s3.hdfs.rollCount=30
  54. a1.sinks.s3.hdfs.fileType=DataStream
  55. a1.sinks.s3.hdfs.writeFormat=Text

6.4.5 启动agent

  1. [root@master flumeconf]# flume-ng agent -c ../conf/ -f ./mytest.conf -n a1 -Dflume.root.logger=INFO,console

6.4.6 测试:

七、选择器的使用

7.1、说明

Flume中的Channel选择器作用于source阶段 ,是决定Source接受的特定事件写入到哪个Channel的组件,他们告诉Channel处理器,然后由其将事件写入到Channel。

Agent中各个组件的交互

由于Flume不是两阶段提交,事件被写入到一个Channel,然后事件在写入下一个Channel之前提交,如果写入一个Channel出现异常,那么之前已经写入到其他Channel的相同事件不能被回滚。当这样的异常发生时,Channel处理器抛出ChannelException异常,事务失败,如果Source试图再次写入相同的事件(大多数情况下,会再次写入,只有Syslog,Exec等Source不能重试,因为没有办法生成相同的数据),重复的事件将写入到Channel中,而先前的提交是成功的,这样在Flume中就发生了重复。

Channel选择器的配置是通过Channel处理器完成的,Channel选择器可以指定一组Channel是必须的,另一组的可选的。

Flume分类两种选择器,如果Source配置中没有指定选择器,那么会自动使用复制Channel选择器.

  • replicating:该选择器复制每个事件到通过Source的Channels参数指定的所有Channel中。
  • multiplexing:是一种专门用于动态路由事件的Channel选择器,通过选择事件应该写入到哪个Channel,基于一个特定的事件头的值进行路由

7.2、案例演示:replicating selector

7.2.1 配置方案

  1. [root@master flumeconf]# vi rep.conf
  2. a1.sources = r1
  3. a1.channels = c1 c2
  4. a1.sinks = s1 s2
  5. a1.sources.r1.type=syslogtcp
  6. a1.sources.r1.host = master
  7. a1.sources.r1.port = 6666
  8. a1.sources.r1.selector.type=replicating
  9. a1.channels.c1.type=memory
  10. a1.channels.c2.type=memory
  11. a1.sinks.s1.type=hdfs
  12. a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/rep
  13. a1.sinks.s1.hdfs.filePrefix=s1sink
  14. a1.sinks.s1.hdfs.fileSuffix=.log
  15. a1.sinks.s1.hdfs.inUseSuffix=.tmp
  16. a1.sinks.s1.hdfs.rollInterval=60
  17. a1.sinks.s1.hdfs.rollSize=1024
  18. a1.sinks.s1.hdfs.rollCount=10
  19. a1.sinks.s1.hdfs.idleTimeout=0
  20. a1.sinks.s1.hdfs.batchSize=100
  21. a1.sinks.s1.hdfs.fileType=DataStream
  22. a1.sinks.s1.hdfs.writeFormat=Text
  23. a1.sinks.s1.hdfs.round=true
  24. a1.sinks.s1.hdfs.roundValue=1
  25. a1.sinks.s1.hdfs.roundUnit=second
  26. a1.sinks.s1.hdfs.useLocalTimeStamp=true
  27. a1.sinks.s2.type=hdfs
  28. a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/rep
  29. a1.sinks.s2.hdfs.filePrefix=s2sink
  30. a1.sinks.s2.hdfs.fileSuffix=.log
  31. a1.sinks.s2.hdfs.inUseSuffix=.tmp
  32. a1.sinks.s2.hdfs.rollInterval=60
  33. a1.sinks.s2.hdfs.rollSize=1024
  34. a1.sinks.s2.hdfs.rollCount=10
  35. a1.sinks.s2.hdfs.idleTimeout=0
  36. a1.sinks.s2.hdfs.batchSize=100
  37. a1.sinks.s2.hdfs.fileType=DataStream
  38. a1.sinks.s2.hdfs.writeFormat=Text
  39. a1.sinks.s2.hdfs.round=true
  40. a1.sinks.s2.hdfs.roundValue=1
  41. a1.sinks.s2.hdfs.roundUnit=second
  42. a1.sinks.s2.hdfs.useLocalTimeStamp=true
  43. a1.sources.r1.channels=c1 c2
  44. a1.sinks.s1.channel=c1
  45. a1.sinks.s2.channel=c2

7.2.2 启动agent的服务:

  1. [root@master flumeconf]# flume-ng agent -c ../conf -f ./rep.conf -n a1 -Dflume.root.logger=INFO,console

7.2.3 测试:

  1. [root@master ~]# echo "hello world hello qianfeng" | nc master 6666

7.3、案例演示:Multiplexing selector

7.3.1 配置方案

  1. [root@master flumeconf]# vi mul.conf
  2. a1.sources = r1
  3. a1.channels = c1 c2
  4. a1.sinks = s1 s2
  5. a1.sources.r1.type=http
  6. a1.sources.r1.bind = master
  7. a1.sources.r1.port = 6666
  8. a1.sources.r1.selector.type=multiplexing
  9. a1.sources.r1.selector.header = state
  10. a1.sources.r1.selector.mapping.USER = c1
  11. a1.sources.r1.selector.mapping.ORDER = c2
  12. a1.sources.r1.selector.default = c1
  13. a1.channels.c1.type=memory
  14. a1.channels.c2.type=memory
  15. a1.sinks.s1.type=hdfs
  16. a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/mul
  17. a1.sinks.s1.hdfs.filePrefix=s1sink
  18. a1.sinks.s1.hdfs.fileSuffix=.log
  19. a1.sinks.s1.hdfs.inUseSuffix=.tmp
  20. a1.sinks.s1.hdfs.rollInterval=60
  21. a1.sinks.s1.hdfs.rollSize=1024
  22. a1.sinks.s1.hdfs.rollCount=10
  23. a1.sinks.s1.hdfs.idleTimeout=0
  24. a1.sinks.s1.hdfs.batchSize=100
  25. a1.sinks.s1.hdfs.fileType=DataStream
  26. a1.sinks.s1.hdfs.writeFormat=Text
  27. a1.sinks.s1.hdfs.round=true
  28. a1.sinks.s1.hdfs.roundValue=1
  29. a1.sinks.s1.hdfs.roundUnit=second
  30. a1.sinks.s1.hdfs.useLocalTimeStamp=true
  31. a1.sinks.s2.type=hdfs
  32. a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/mul
  33. a1.sinks.s2.hdfs.filePrefix=s2sink
  34. a1.sinks.s2.hdfs.fileSuffix=.log
  35. a1.sinks.s2.hdfs.inUseSuffix=.tmp
  36. a1.sinks.s2.hdfs.rollInterval=60
  37. a1.sinks.s2.hdfs.rollSize=1024
  38. a1.sinks.s2.hdfs.rollCount=10
  39. a1.sinks.s2.hdfs.idleTimeout=0
  40. a1.sinks.s2.hdfs.batchSize=100
  41. a1.sinks.s2.hdfs.fileType=DataStream
  42. a1.sinks.s2.hdfs.writeFormat=Text
  43. a1.sinks.s2.hdfs.round=true
  44. a1.sinks.s2.hdfs.roundValue=1
  45. a1.sinks.s2.hdfs.roundUnit=second
  46. a1.sinks.s2.hdfs.useLocalTimeStamp=true
  47. a1.sources.r1.channels=c1 c2
  48. a1.sinks.s1.channel=c1
  49. a1.sinks.s2.channel=c2

7.3.2 启动agent的服务:

  1. [root@master flumeconf]# flume-ng agent -c ../conf -f ./mul.conf -n a1 -Dflume.root.logger=INFO,console

7.3.3 测试:

  1. [root@master ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://master:6666
  2. [root@master ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this is my content"}]' http://master:6666

7.4、自定义拦截器

7.4.0、需求:

  1. 为了提高Flume的扩展性,用户可以自己定义一个拦截器
  2. eventbody中的数据,以数字开头的,存储为 hdfs://qianfeng01:8020/flume/number.log s1
  3. eventbody中的数据,以字母开头的,存储为 hdfs://qianfeng1:8020/flume/character.log s2
  4. eventbody中的数据,其他的开头的,存储为 hdfs: //qianfeng01:8020/flume/other.log s3

7.4.1、pom.xml

可以参考/code/pom. xml

  1. <!-- https: //mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
  2. <dependency>
  3. <groupId>org.apache.flume</groupId>
  4. <artifactId>flume-ng-core</artifactId>
  5. <version>1.8.0</version>
  6. </dependency>

7.4.2、代码

  1. public class MyInterceptor implements Interceptor {
  2. private static final String LOCATION_KEY = "location";
  3. private static final String LOCATION_NUMBER = "number";
  4. private static final String LOCATION_CHARACTER = "character";
  5. private static final String LOCATION_OTHER = "other";
  6. /**
  7. * 当拦截到单个Event的时候调用
  8. *
  9. * @param event 被拦截到的Event
  10. * @return 这个拦截器发送出去的Event,如果返回null,则这个Event将被丢弃
  11. */
  12. @Override
  13. public Event intercept(Event event) {
  14. // 1. 获取到拦截器拦截到Event的body部分
  15. byte[] body = event.getBody();
  16. // 2. 验证是否是以数字开头的
  17. if (body[0] > '0' && body[0] <= '9') {
  18. event.getHeaders().put(LOCATION_KEY, LOCATION_NUMBER);
  19. } else if (body[0] >= 'a' && body[0] <= 'z' || (body[0] >= 'A' && body[0] <= 'Z')) {
  20. event.getHeaders().put(LOCATION_KEY, LOCATION_CHARACTER);
  21. } else {
  22. event.getHeaders().put(LOCATION_KEY, LOCATION_OTHER);
  23. }
  24. return event;
  25. }
  26. @Override
  27. public List<Event> intercept(List<Event> list) {
  28. for (Event event : list) {
  29. intercept(event);
  30. }
  31. return list;
  32. }
  33. @Override
  34. public void initialize() {
  35. }
  36. @Override
  37. public void close() {
  38. }
  39. public static class MyBuilder implements Builder {
  40. @Override
  41. public Interceptor build() {
  42. return new MyInterceptor();
  43. }
  44. @Override
  45. public void configure(Context context) {
  46. }
  47. }
  48. }

7.4.3、打包上传

使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到flume lib目录下

7.4.4、编写方案

  1. a1.sources=r1
  2. a1.channels=c1 c2 c3
  3. a1.sinks=s1 s2 s3
  4. a1.sourres.r1.channels=c1 c2 c3
  5. a1.sinks.s1.channel=c1
  6. a1.sinks.s2.channel=c2
  7. a1.sinks.s3.channel=c3
  8. #设置source的属性
  9. a1.sources.r1.type=syslogtcp
  10. a1.sources.r1.host=master
  11. a1.sources.r1.port=12345
  12. #设置拦截器
  13. a1.sources.r1.interceptors=i1
  14. a1.sources.r1.interceptors.i1.type=com.qf.flume.MyInterceptor$MyBuilder
  15. #设置选择器的属性
  16. a1.sources.r1.selector.type=multiplexing
  17. a1.sources.r1.selector.header=location
  18. a1.sources.r1.selector.mapping.number=c1
  19. a1.sources.r1.selector.mapping.character=c2
  20. a1.sources.r1.selector.mapping.other=c3
  21. #设置channel的属性
  22. a1.channels.c1.type=memory
  23. a1.channels.c1.capacity=1000
  24. a1.channels.c2.type=memory
  25. a1.channels.c2.capacity=1000
  26. a1.channels.c3.type=memory
  27. a1.channels.c3.capacity=1000
  28. #设置sink的属性
  29. a1.sinks.s1.type=hdfs
  30. a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/customInterceptor/s1/%Y-%m-%d-%H
  31. a1.sinks.s1.hdfs.useLocalTimeStamp=true
  32. ai.sinks.s1.hdfs.filePrefix=regex
  33. a1.sinks.s1.hdfs.rollInterval=0
  34. a1.sinks.s1.hdfs.rollSize=102400
  35. a1.sinks.s1.hdfs.rollCount=30
  36. a1.sinks.s1.hdfs.fileType=DataStream
  37. a1.sinks.s1.hdfs.writeFormat=Text
  38. a1.sinks.s2.type=hdfs
  39. a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/customInterceptor/s2/%Y-%m-%d-%H
  40. a1.sinks.s2.hdfs.useLocalTimeStamp=true
  41. a1.sinks.s2.hdfs.filePrefix=regex
  42. a1.sinks.s2.hdfs.rollInterval=0
  43. a1.sinks.s2.hdfs.rollSize=102400
  44. a1.sinks.s2.hdfs.rollCount=30
  45. a1.sinks.s2.hdfs.fileType=DataStream
  46. a1.sinks.s2.hdfs.writeFormat=Text
  47. a1.sinks.s3.type=hdfs
  48. a1.sinks.s3.hdfs.path=hdfs://master:8020/flume/customInterceptor/s3/%Y-%m-%d-%H
  49. a1.sinks.s3.hdfs.useLocalTimeStamp=true
  50. a1.sinks.s3.hdfs.filePrefix=regex
  51. a1.sinks.s3.hdfs.rollInterval=0
  52. a1,sinks.s3.hdfs.rollSize=102400
  53. a1.sinks.s3.hdfs.rollCount=30
  54. a1.sinks.s3.hdfs.fileType=DataStream
  55. a1.sinks.s3.hdfs.writeFormat=Text

6.4.5、启动agent

  1. [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf/-f ./mytest.conf -n a1 -Dflume.root.logger=INF0,console

测试

CSDN 社区图书馆,开张营业!

深读计划,写书评领图书福利~

相关文章

最新文章

更多