Flume:搭建配置以及 source读取在netcat、http,sink 落实在本地、HDFS

x33g5p2x  于2021-11-29 转载在 Flume  
字(6.0k)|赞(0)|评价(0)|浏览(743)

1、下载创建并解压Flume

下载地址:http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

云盘链接:https://pan.baidu.com/s/1dFJ6uhgJcSUGwAHiIOrCDA
提取码:ycjf

下载好,上传到虚拟机节点,进行解压。先创建目录。

  1. # 创建目录
  2. mkdir -p /usr/flume
  3. cd /usr/flume
  4. # 解压到flume目录
  5. tar -zxvf ./apache-flume-1.7.0-bin.tar.gz -C /usr/flume/
  6. # cd /usr/flume/,重命名一下
  7. mv apache-flume-1.7.0-bin/ ./flume-1.7.0

2、配置文件

2.1 配置 flume-env.sh文件

  1. cd /usr/flume/flume-1.7.0/conf
  2. ls
  3. mv flume-env.sh.template flume-env.sh
  4. vi flume-env.sh
  5. ## 添加jdk路径:
  6. export JAVA_HOME=/usr/java/jdk1.8.0_171

2.2 配置Flume环境变量(三个节点)

在 /etc/profile 配置Flume环境变量。

  1. vi /etc/profile
  2. # flume
  3. export FLUME_HOME=/usr/flume/flume-1.7.0
  4. export PATH=$PATH:$FLUME_HOME/bin
  5. export FLUME_CONF_DIR=$FLUME_HOME/conf
  6. source /etc/profile

2.3 配置conf

通过netcat作为source, sink为logger的方式,配置example.conf。

  • agent进行重命名: a1
  • sources: r1
  • sinks: k1
  • channels: c1
  1. # 1、通过netcat作为source, sink为logger的方式
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = netcat
  8. a1.sources.r1.bind = localhost
  9. a1.sources.r1.port = 44444
  10. # Describe the sink
  11. a1.sinks.k1.type = logger
  12. # Use a channel which buffers events in memory
  13. a1.channels.c1.type = memory
  14. a1.channels.c1.capacity = 1000
  15. a1.channels.c1.transactionCapacity = 100
  16. # Bind the source and sink to the channel
  17. a1.sources.r1.channels = c1
  18. a1.sinks.k1.channel = c1

cd /usr/flume/flume-1.7.0,在此目录下启动Flume。

  1. /usr/flume/flume-1.7.0/bin/flume-ng agent --conf conf --conf-file /usr/flume/flume-1.7.0/conf/example.conf -name a1 -Dflume.root.logger=INFO,console

再启动master节点,输入 telent localhost 44444

输入数据之后,另一个master就会接收到数据并提示!

当关闭flume,telnet也会自动断开。

2.4 分发文件

  1. scp -r /usr/flume/ root@slave1:/usr/
  2. scp -r /usr/flume/ root@slave2:/usr/

3、Flume的conf 多种部署

3.1 需求:显示结果是过滤数据

通过netcat作为source, sink为logger的方式,现在我之关心字母,过滤掉数字。

重新编辑 example.conf,添加正则表达。

  1. # 添加source定义正则匹配规则
  2. a1.sources.r1.interceptors = i1
  3. a1.sources.r1.interceptors.i1.type =regex_filter
  4. a1.sources.r1.interceptors.i1.regex =^[0-9]*$
  5. a1.sources.r1.interceptors.i1.excludeEvents =true

保存退出,再启动flume。

  1. ./bin/flume-ng agent --conf conf --conf-file ./conf/example.conf -name a1 -Dflume.root.logger=INFO,console

telnet localhost 44444,输入数据 12,11,bad,hello java。

可以发现接收的数据是过滤了数字!!。

3.2 需求:通过netcat作为source, sink写到hdfs

编辑:vi examp.conf

  1. # Name the components on this agent 针对agent重命名为a1
  2. a1.sources = r1 # source别名为 r1
  3. a1.sinks = k1 # sinks别名为 k1
  4. a1.channels = c1 # channel别名为 c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = netcat
  7. a1.sources.r1.bind = localhost
  8. a1.sources.r1.port = 44444
  9. # source定义正则匹配规则
  10. a1.sources.r1.interceptors = i1
  11. a1.sources.r1.interceptors.i1.type =regex_filter
  12. a1.sources.r1.interceptors.i1.regex =^[0-9]*$
  13. a1.sources.r1.interceptors.i1.excludeEvents =true
  14. # Describe the sink
  15. # a1.sinks.k1.type = logger
  16. a1.channels = c1
  17. a1.sinks = k1
  18. # sink为hdfs
  19. a1.sinks.k1.type = hdfs
  20. # 数据存储到hdfs的文件路径
  21. a1.sinks.k1.hdfs.path = hdfs:/flume
  22. # 表示最终的文件前缀
  23. a1.sinks.k1.hdfs.filePrefix = events
  24. # 表示到了需要触发的时间时,是否要更新文件夹,true:表示是
  25. a1.sinks.k1.hdfs.round = true
  26. a1.sinks.k1.hdfs.roundValue = 10
  27. # 表示切换时间的单位是分钟
  28. a1.sinks.k1.hdfs.roundUnit = minute
  29. # 表示过了一分钟生成一个文件
  30. a1.sinks.k1.hdfs.roundInterval = 60
  31. a1.sinks.k1.hdfs.fileType = DataStream
  32. # Use a channel which buffers events in
  33. # memory channel配置
  34. a1.channels.c1.type = memory
  35. a1.channels.c1.capacity = 1000
  36. a1.channels.c1.transactionCapacity = 100
  37. # Bind the source and sink to the channel
  38. # 将source channel sink进行串联起来
  39. a1.sources.r1.channels = c1
  40. a1.sinks.k1.channel = c1

启动flume,在example.conf配置中,我把每隔一分钟输入的数据生成一个文件到HDFS的flume目录下,文件前缀名有event。

  1. ./bin/flume-ng agent --conf conf --conf-file ./conf/example.conf -name a1 -Dflume.root.logger=INFO,console

可以看到一分钟内它还是个临时文件tmp,等一分钟后就变成一个文件在HDFS了。
查看HDFS是否有flume保存的文件。

  1. # 如果事先HDFS已经创建了flume,要先删除
  2. hadoop fs -ls /flume
  3. hadoop fs -cat /flume/events.1638113932881

可以发现,我们是把sink的数据存到HDFS了。
我们有发现一个问题:就是假如一分钟内输入的flume数据过小,那就极大浪费存储空间,我们需要限定一下,设置flume防止小文件。

  1. # 1、限定一个文件的文件数据大小
  2. a1.sinks.k1.hdfs.rollSize = 200*1024*1024
  3. # 2、限定文件可以存储多少个event
  4. a1.sinks.k1.hdfs.rollCount = 10000

3.3 通过HTTP作为source, sink写到logger

创建http.conf, 在/usr/flume/flume-1.7.0/conf,vi http.conf

  1. # HTTP.conf
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
  8. a1.sources.r1.bind = master
  9. a1.sources.r1.port = 50020
  10. #a1.sources.r1.fileHeader = true
  11. # Describe the sink
  12. a1.sinks.k1.type = logger
  13. # Use a channel which buffers events in memory
  14. a1.channels.c1.type = memory
  15. a1.channels.c1.capacity = 1000
  16. a1.channels.c1.transactionCapacity = 100
  17. # Bind the source and sink to the channel
  18. a1.sources.r1.channels = c1
  19. a1.sinks.k1.channel = c1

配置完成,启动Flume。在 /usr/flume/flume-1.7.0 目录。

  1. ./bin/flume-ng agent --conf conf --conf-file ./conf/http.conf -name a1 -Dflume.root.logger=INFO,console

输入数据:

  1. curl -X POST -d '[{"headers" : {"timestamp" : "434324343","host" : "random_host.example.com"},"body" : "random_body"},{"headers" : {"namenode" : "namenode.example.com","datanode" : "random_datanode.example.com"},"body" : "badou,badou"}]' master:50020

这个命令也可以在其他节点执行。http是允许在不同结点发送消息给source的。

3.4 多节点进行串联

在slave1 编辑文件pull.conf
vi pull.conf

  1. #Name the components on this agent
  2. a2.sources= r1
  3. a2.sinks= k1
  4. a2.channels= c1
  5. #Describe/configure the source
  6. a2.sources.r1.type= avro
  7. a2.sources.r1.channels= c1
  8. a2.sources.r1.bind= slave1
  9. a2.sources.r1.port= 44444
  10. #Describe the sink
  11. a2.sinks.k1.type= logger
  12. a2.sinks.k1.channel = c1
  13. #Use a channel which buffers events in memory
  14. a2.channels.c1.type= memory
  15. a2.channels.c1.keep-alive= 10
  16. a2.channels.c1.capacity= 100000
  17. a2.channels.c1.transactionCapacity= 100000

在master节点编辑 vi push.conf

  1. #Name the components on this agent
  2. a1.sources= r1
  3. a1.sinks= k1
  4. a1.channels= c1
  5. #Describe/configure the source
  6. a1.sources.r1.type= netcat
  7. a1.sources.r1.bind= localhost
  8. a1.sources.r1.port = 44444
  9. a1.sources.r1.channels= c1
  10. #Use a channel which buffers events in memory
  11. a1.channels.c1.type= memory
  12. a1.channels.c1.keep-alive= 10
  13. a1.channels.c1.capacity= 100000
  14. a1.channels.c1.transactionCapacity= 100000
  15. #Describe/configure the source
  16. a1.sinks.k1.type= avro
  17. a1.sinks.k1.channel= c1
  18. a1.sinks.k1.hostname= slave1
  19. a1.sinks.k1.port= 44444

  1. # 1、slave1启动:
  2. ./bin/flume-ng agent -c conf -f conf/pull.conf -n a2 -Dflume.root.logger=INFO,console
  3. # 2、master启动:
  4. ./bin/flume-ng agent -c conf -f conf/push.conf -n a1 -Dflume.root.logger=INFO,console
  5. master上执行 telnet localhost 44444

可以发现我们在master输入的数据,在salve1 进行显示。

相关文章