在企业中,大数据的处理流程一般是:
参考下图:
在数据采集和搜集的工具中,Flume框架占有一定的市场份量。
Flume 是一种分布式、可靠且可用的服务,用于高效收集、聚合和移动大量日志数据。它具有基于流数据流的简单灵活的架构。它具有可调整的可靠性机制以及许多故障转移和恢复机制,具有健壮性和容错性。它使用允许在线分析应用程序的简单可扩展数据模型。
参考官网: http://flume.apache.org/
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
flume 最开始是由 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。但随着 flume 功能的扩展,flume的代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点渐渐暴露出来,尤其是在发行版本 0.9.4中,日志传输不稳定的现象尤为严重。
为了解决这些问题,2011 年 10 月 22 号,cloudera 对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,并将 Flume 纳入 apache 旗下,从cloudera Flume 改名为 Apache Flume。
为了与之前版本区分开,重构后的版本统称为 Flume NG(next generation),重构前的版本被统称为 Flume OG(original generation),Flume目前只有Linux系统的启动脚本,没有Windows环境的启动脚本。
Flume 运行的核心是 Agent。Flume是以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方。如下图所示:
- Client:
客户端,Client生产数据,运行在一个独立的线程中
- Event:
一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)
- Flow:
Event从源点到达目的点的迁移的抽象。
- Agent:
一个独立的Flume进程,运行在JVM中,包含组件Source、 Channel、 Sink。
每台机器运行一个agent,但是一个agent中可以包含多个sources和sinks。
- Source:
数据收集组件。source从Client收集数据,传递给Channel
- Channel:
管道,负责接收source端的数据,然后将数据推送到sink端。
- Sink:
负责从channel端拉取数据,并将其推送到持久化系统或者是下一个Agent。
- selector:
选择器,作用于source端,然后决定数据发往哪个目标。
- interceptor:
拦截器,flume允许使用拦截器拦截数据。允许使用拦截器链,作用于source和sink阶段。
3.1、安装和配置环境变量
3.1.1、准备软件包
将apache-flume-1.8.0-bin.tar.gz 上传到linux系统中的/root/soft/目录中
3.1.2、解压软件包
[root@master soft]# pwd
/root/soft
[root@master soft]# tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local/
3.1.3、更名操作
[root@master soft]# cd /usr/local/
[root@master local]# mv apache-flume-1.8.0-bin/ flume
3.1.4、配置环境变量
[root@master local]# vi /etc/profile
........省略..........
export FLUME_HOME=/usr/local/flume
export PATH=$FLUME_HOME/bin:$PATH
# 加载环境变量
[root@master apps]# source /etc/profile
3.1.5、验证环境变量
[root@master local]# flume-ng version
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f
Compiled by denes on Fri Sep 15 14:58:00 CEST 2017
From source with checksum fbb44c8c8fb63a49be0a59e27316833d
3.2、配置文件
[root@master local]# cd flume/conf/
[root@master conf]# ll #查看里面是否有一个flume-env.sh.template文件
[root@master conf]# cp flume-env.sh.template flume-env.sh
[root@master conf]# vim flume-env.sh
........省略..........
export JAVA_HOME=/usr/local/jdk
........省略..........
在单个 Agent 内由单个 Source, Channel, Sink 建立一个单一的数据流模型,如下图所示,整个数据流为
Web Server --> Source --> Channel --> Sink --> HDFS。
**1)**多 Agent 串行传输数据流模型
**2)**多 Agent 汇聚数据流模型
**3)**单 Agent 多路数据流模型
**4)**Sinkgroups 数据流模型
在flume提供的数据流模型中,几个原则很重要。
Source--> Channel
1.单个Source组件可以和多个Channel组合建立数据流,既可以replicating 和 multiplexing。
2.多个Sources可以写入单个 Channel
Channel-->Sink
1.多个Sinks又可以组合成Sinkgroups从Channel中获取数据,既可以loadbalancing和failover机制。
2.多个Sinks也可以从单个Channel中取数据。
3.单个Sink只能从单个Channel中取数据
根据上述 5 个原则,你可以设计出满足你需求的数据流模型。
要定义单个代理中的流,您需要通过通道链接源和接收器。您需要列出给定代理的源,接收器和通道,然后将源和接收器指向一个通道。一个源实例可以指定多个通道,但是一个接收器实例只能指定一个通道。格式如下:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>
# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>
案例如下:
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1
# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue>
# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>
# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>
案例如下:
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1
# set channel for sources, sinks
# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000
# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100
# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
#...
# Avro source:
avro
# Syslog TCP source:
syslogtcp
# Syslog UDP Source:
syslogudp
# HTTP Source:
http
# Exec source:
exec
# JMS source:
jms
# Thrift source:
thrift
# Spooling directory source:
spooldir
# Kafka source:
org.apache.flume.source.kafka,KafkaSource
.....
# Memory Channel
memory
# JDBC Channel
jdbc
# Kafka Channel
org.apache.flume.channel.kafka.KafkaChannel
# File Channel
file
# HDFS Sink
hdfs
# HIVE Sink
hive
# Logger Sink
logger
# Avro Sink
avro
# Kafka Sink
org.apache.flume.sink.kafka.KafkaSink
# Hbase Sink
hbase
配置可参考:https://flume.apache.org/releases/content/1.10.0/FlumeUserGuide.html
Avro Source:监听一个指定的Avro端口,通过Avro端口可以获取到Avro client发送过来的文件,即只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容,输出位置为Logger
5.1.1、编写采集方案
[root@master flume]# mkdir flumeconf
[root@master flume]# cd flumeconf
[root@master flumeconf]# vi avro-logger.conf
#定义各个组件的名字
a1.sources=avro-sour1
a1.channels=mem-chan1
a1.sinks=logger-sink1
#定义sources组件的相关属性
a1.sources.avro-sour1.type=avro
a1.sources.avro-sour1.bind=master
a1.sources.avro-sour1.port=9999
#定义channels组件的相关属性
a1.channels.mem-chan1.type=memory
#定义sinks组件的相关属性
a1.sinks.logger-sink1.type=logger
a1.sinks.logger-sink1.maxBytesToLog=100
#组件之间进行绑定
a1.sources.avro-sour1.channels=mem-chan1
a1.sinks.logger-sink1.channel=mem-chan1
5.1.2、启动Agent
[root@master flumeconf]# flume-ng agent -c ../conf -f ./avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
5.1.3、测试数据
另起一个窗口测试
[root@master ~]# mkdir flumedata
[root@master ~]# cd flumedata/
[root@master flumedata]#
[root@master flumedata]# date >> test.data
[root@master flumedata]# cat test.data
2019年 11月 21日 星期四 21:22:36 CST
[root@master flumedata]# ping master >> test.data
[root@master flumedata]# cat test.data
....省略....
[root@master flumedata]# flume-ng avro-client -c /usr/local/flume-1.6.0/conf/ -H master -p 9999 -F ./test.dat
Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
#常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容
memory:传输数据的Channel为Memory
hdfs 是输出目标为Hdfs
5.2.1、配置方案
[root@master flumeconf]# vim exec-hdfs.conf
# 定义sources的属性
a1.sources=r1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /root/flumedata/test.data
# 定义sinks的属性
a1.sinks=k1
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://master:8020/flume/tailout/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix=events
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=second
a1.sinks.k1.hdfs.rollInterval=3
a1.sinks.k1.hdfs.rollSize=20
a1.sinks.k1.hdfs.rollCount=5
a1.sinks.k1.hdfs.batchSize=1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.fileType=DataStream
# 定义channels的属性
a1.channels
channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
# 组件之间的绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
5.2.2、启动Agent
[root@master flumeconf]# flume-ng agent -c ../conf -f ./exec-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
5.2.3、测试数据
[root@master flumedata]# ping master >> test.data
spool: Source来源于目录,有文件进入目录就摄取。
mem:通过内存来传输数据
logger:是传送数据到日志
5.3.1、配置方案
[root@master flumeconf]# vi spool-logger.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir = /home/flume/spool
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.deletePolicy=never
a1.sources.r1.fileHeader=false
a1.sources.r1.fileHeaderKey=file
a1.sources.r1.basenameHeader=false
a1.sources.r1.basenameHeaderKey=basename
a1.sources.r1.batchSize=100
a1.sources.r1.inputCharset=UTF-8
a1.sources.r1.bufferMaxLines=1000
a1.channels.c1.type=memory
a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=100
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
5.3.2、启动agent
[root@master flumeconf]# flume-ng agent -c ../conf -f ./spool-logger.conf -n a1 -Dflume.root.logger=INFO,console
5.3.3、测试
[root@master ~]# for i in `seq 1 10`; do echo $i >> /home/flume/spool/$i;done
http: 表示数据来源是http网络协议,一般接收的请求为get或post请求. 所有的http请求会通过插件格式的Handle转化为一个flume的Event数据.
mem:表示用内存传输通道
logger:表示输出格式为Logger格式
5.4.1、配置方案
[root@master flumeconf]# vi http-logger.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type=http
a1.sources.r1.bind = master
a1.sources.r1.port = 6666
a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler
a1.channels.c1.type=memory
a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=16
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
5.4.2、启动agent的服务
[root@master ~]# flume-ng agent -c ../conf -f ./http-logger.conf -n a1 -Dflume.root.logger=INFO,console
5.4.3、测试
[root@master ~]# curl -X POST -d '[{"headers":{"name":"zhangsan","pwd":"123456"},"body":"this is my content"}]' http://master:6666
在Flume运行过程中,Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。拦截器有下面几个特点:
通过时间拦截器,数据源为SyslogTcp,传送的通道模式是FileChannel,最后输出的目的地为HDFS
6.2.1 配置方案:
[root@master flumeconf]# vi ts.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type=syslogtcp
a1.sources.r1.host=master
a1.sources.r1.port=6666
a1.sources.r1.interceptors=i1 i2 i3
a1.sources.r1.interceptors.i1.type=timestamp
a1.sources.r1.interceptors.i1.preserveExisting=false
a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.preserveExisting=false
a1.sources.r1.interceptors.i2.useIP=true
a1.sources.r1.interceptors.i2.hostHeader=hostname
a1.sources.r1.interceptors.i3.type=static
a1.sources.r1.interceptors.i3.preserveExisting=false
a1.sources.r1.interceptors.i3.key=hn
a1.sources.r1.interceptors.i3.value=master
a1.channels.c1.type=memory
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
6.2.2 启动agent的服务:
[root@master flumeconf]# flume-ng agent -c ../conf -f ./ts.conf -n a1 -Dflume.root.logger=INFO,console
6.2.3 测试:
[root@master ~]# echo "hello world hello qiangeng" | nc master 6666
拦截器为正则表达式拦截器, 数据源为Syslogtcp格式,传送通道为MemChannel,最后传送的目的地是HDFS
6.3.1 配置方案
[root@master flumeconf]# vi regex-ts.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type=syslogtcp
a1.sources.r1.host = master
a1.sources.r1.port = 6666
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=regex_filter
#不要加引号包裹正则
a1.sources.r1.interceptors.i1.regex=^[0-9].*$
a1.sources.r1.interceptors.i1.excludeEvents=false
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
6.3.2、启动agent的服务:
[root@master flumeconf]# flume-ng agent -c ../conf -f ./regex-ts.conf -n a1 -Dflume.root.logger=INFO,console
6.3.3、测试:
[root@master ~]# echo "hello world hello qiangeng" | nc master 6666
[root@master ~]# echo "123123123 hello world hello qiangeng" | nc master 6666
6.4.0 需求:
为了提高Flume的扩展性,用户可以自己定义一个拦截器
将event的body中的数据, 以数字开头的, 存储为 hdfs://master:8020/flume/number.log s1
将event的body中的数据, 以字母开头的, 存储为 hdfs://master:8020/flume/character.log s2
将event的body中的数据, 其他的开头的, 存储为 hdfs://master:8020/flume/other.log s3
6.4.1 pom.xml
可以参考/code/pom.xml
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>
6.4.2 代码
具体代码可以参考 code/MyInterceptor
public class MyInterceptor implements Interceptor {
private static final String LOCATION_KEY = "location";
private static final String LOCATION_NUM = "number";
private static final String LOCATION_CHAR = "character";
private static final String LOCATION_OTHER = "other";
@Override
public void initialize() {
}
/**
* 当拦截到单个Event的时候调用
* @param event 这个被拦截的Event
* @return 这个拦截器发送出去的Event
*/
@Override
public Event intercept(Event event) {
// 对拦截到的event的header中添加指定的键值对
// 1. 获取拦截到的event的body
byte[] body = event.getBody();
// 2. 验证是否是以数字开头
if (body[0] >= '0' && body[0] <= '9') {
// 在这个event的头部添加一个键值对,来标识这个event需要放入到哪一个channel中
event.getHeaders().put(LOCATION_KEY, LOCATION_NUM);
}
else if (body[0] >= 'a' && body[0] <= 'z' || body[0] >= 'A' && body[0] <= 'Z') {
event.getHeaders().put(LOCATION_KEY, LOCATION_CHAR);
}
else {
event.getHeaders().put(LOCATION_KEY, LOCATION_OTHER);
}
return event;
}
/**
* 批量拦截到Event
* @param list 存储Event的集合
* @return 处理之后的Event集合
*/
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
/**
* 设计一个获取拦截器对象的类
*/
public static class MyBuilder implements Builder {
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
6.4.3 打包上传
使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到flume lib目录下
6.4.4 编写方案
a1.sources=r1
a1.channels=c1 c2 c3
a1.sinks=s1 s2 s3
a1.sources.r1.channels=c1 c2 c3
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
a1.sinks.s3.channel=c3
# 设置source的属性
a1.sources.r1.type=syslogtcp
a1.sources.r1.host=master
a1.sources.r1.port=12345
# 设置拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.qf.MyInterceptor$MyBuilder
# 设置选择器的属性
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=location
a1.sources.r1.selector.mapping.number=c1
a1.sources.r1.selector.mapping.character=c2
a1.sources.r1.selector.mapping.other=c3
# 设置channel的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c3.type=memory
a1.channels.c3.capacity=1000
# 设置sink的属性
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/customInterceptor/s1/%Y-%m-%d-%H
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=regex
a1.sinks.s1.hdfs.rollInterval=0
a1.sinks.s1.hdfs.rollSize=102400
a1.sinks.s1.hdfs.rollCount=30
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/customInterceptor/s2/%Y-%m-%d-%H
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sinks.s2.hdfs.filePrefix=regex
a1.sinks.s2.hdfs.rollInterval=0
a1.sinks.s2.hdfs.rollSize=102400
a1.sinks.s2.hdfs.rollCount=30
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s3.type=hdfs
a1.sinks.s3.hdfs.path=hdfs://master:8020/flume/customInterceptor/s3/%Y-%m-%d-%H
a1.sinks.s3.hdfs.useLocalTimeStamp=true
a1.sinks.s3.hdfs.filePrefix=regex
a1.sinks.s3.hdfs.rollInterval=0
a1.sinks.s3.hdfs.rollSize=102400
a1.sinks.s3.hdfs.rollCount=30
a1.sinks.s3.hdfs.fileType=DataStream
a1.sinks.s3.hdfs.writeFormat=Text
6.4.5 启动agent
[root@master flumeconf]# flume-ng agent -c ../conf/ -f ./mytest.conf -n a1 -Dflume.root.logger=INFO,console
6.4.6 测试:
Flume中的Channel选择器作用于source阶段 ,是决定Source接受的特定事件写入到哪个Channel的组件,他们告诉Channel处理器,然后由其将事件写入到Channel。
Agent中各个组件的交互
由于Flume不是两阶段提交,事件被写入到一个Channel,然后事件在写入下一个Channel之前提交,如果写入一个Channel出现异常,那么之前已经写入到其他Channel的相同事件不能被回滚。当这样的异常发生时,Channel处理器抛出ChannelException异常,事务失败,如果Source试图再次写入相同的事件(大多数情况下,会再次写入,只有Syslog,Exec等Source不能重试,因为没有办法生成相同的数据),重复的事件将写入到Channel中,而先前的提交是成功的,这样在Flume中就发生了重复。
Channel选择器的配置是通过Channel处理器完成的,Channel选择器可以指定一组Channel是必须的,另一组的可选的。
Flume分类两种选择器,如果Source配置中没有指定选择器,那么会自动使用复制Channel选择器.
7.2.1 配置方案
[root@master flumeconf]# vi rep.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2
a1.sources.r1.type=syslogtcp
a1.sources.r1.host = master
a1.sources.r1.port = 6666
a1.sources.r1.selector.type=replicating
a1.channels.c1.type=memory
a1.channels.c2.type=memory
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/rep
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/rep
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundUnit=second
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
7.2.2 启动agent的服务:
[root@master flumeconf]# flume-ng agent -c ../conf -f ./rep.conf -n a1 -Dflume.root.logger=INFO,console
7.2.3 测试:
[root@master ~]# echo "hello world hello qianfeng" | nc master 6666
7.3.1 配置方案
[root@master flumeconf]# vi mul.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2
a1.sources.r1.type=http
a1.sources.r1.bind = master
a1.sources.r1.port = 6666
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.USER = c1
a1.sources.r1.selector.mapping.ORDER = c2
a1.sources.r1.selector.default = c1
a1.channels.c1.type=memory
a1.channels.c2.type=memory
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/mul
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/mul
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundUnit=second
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
7.3.2 启动agent的服务:
[root@master flumeconf]# flume-ng agent -c ../conf -f ./mul.conf -n a1 -Dflume.root.logger=INFO,console
7.3.3 测试:
[root@master ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://master:6666
[root@master ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this is my content"}]' http://master:6666
7.4.0、需求:
为了提高Flume的扩展性,用户可以自己定义一个拦截器
将event的body中的数据,以数字开头的,存储为 hdfs://qianfeng01:8020/flume/number.log s1
将event的body中的数据,以字母开头的,存储为 hdfs://qianfeng1:8020/flume/character.log s2
将event的body中的数据,其他的开头的,存储为 hdfs: //qianfeng01:8020/flume/other.log s3
7.4.1、pom.xml
可以参考/code/pom. xml
<!-- https: //mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
7.4.2、代码
public class MyInterceptor implements Interceptor {
private static final String LOCATION_KEY = "location";
private static final String LOCATION_NUMBER = "number";
private static final String LOCATION_CHARACTER = "character";
private static final String LOCATION_OTHER = "other";
/**
* 当拦截到单个Event的时候调用
*
* @param event 被拦截到的Event
* @return 这个拦截器发送出去的Event,如果返回null,则这个Event将被丢弃
*/
@Override
public Event intercept(Event event) {
// 1. 获取到拦截器拦截到Event的body部分
byte[] body = event.getBody();
// 2. 验证是否是以数字开头的
if (body[0] > '0' && body[0] <= '9') {
event.getHeaders().put(LOCATION_KEY, LOCATION_NUMBER);
} else if (body[0] >= 'a' && body[0] <= 'z' || (body[0] >= 'A' && body[0] <= 'Z')) {
event.getHeaders().put(LOCATION_KEY, LOCATION_CHARACTER);
} else {
event.getHeaders().put(LOCATION_KEY, LOCATION_OTHER);
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void initialize() {
}
@Override
public void close() {
}
public static class MyBuilder implements Builder {
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
7.4.3、打包上传
使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到flume lib目录下
7.4.4、编写方案
a1.sources=r1
a1.channels=c1 c2 c3
a1.sinks=s1 s2 s3
a1.sourres.r1.channels=c1 c2 c3
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
a1.sinks.s3.channel=c3
#设置source的属性
a1.sources.r1.type=syslogtcp
a1.sources.r1.host=master
a1.sources.r1.port=12345
#设置拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.qf.flume.MyInterceptor$MyBuilder
#设置选择器的属性
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=location
a1.sources.r1.selector.mapping.number=c1
a1.sources.r1.selector.mapping.character=c2
a1.sources.r1.selector.mapping.other=c3
#设置channel的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c3.type=memory
a1.channels.c3.capacity=1000
#设置sink的属性
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/customInterceptor/s1/%Y-%m-%d-%H
a1.sinks.s1.hdfs.useLocalTimeStamp=true
ai.sinks.s1.hdfs.filePrefix=regex
a1.sinks.s1.hdfs.rollInterval=0
a1.sinks.s1.hdfs.rollSize=102400
a1.sinks.s1.hdfs.rollCount=30
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/customInterceptor/s2/%Y-%m-%d-%H
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sinks.s2.hdfs.filePrefix=regex
a1.sinks.s2.hdfs.rollInterval=0
a1.sinks.s2.hdfs.rollSize=102400
a1.sinks.s2.hdfs.rollCount=30
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s3.type=hdfs
a1.sinks.s3.hdfs.path=hdfs://master:8020/flume/customInterceptor/s3/%Y-%m-%d-%H
a1.sinks.s3.hdfs.useLocalTimeStamp=true
a1.sinks.s3.hdfs.filePrefix=regex
a1.sinks.s3.hdfs.rollInterval=0
a1,sinks.s3.hdfs.rollSize=102400
a1.sinks.s3.hdfs.rollCount=30
a1.sinks.s3.hdfs.fileType=DataStream
a1.sinks.s3.hdfs.writeFormat=Text
6.4.5、启动agent
[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf/-f ./mytest.conf -n a1 -Dflume.root.logger=INF0,console
测试
CSDN 社区图书馆,开张营业!
深读计划,写书评领图书福利~
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/su2231595742/article/details/125343248
内容来源于网络,如有侵权,请联系作者删除!