Flume:kafka+flume 对接数据

x33g5p2x  于2021-12-02 转载在 Flume  
字(2.2k)|赞(0)|评价(0)|浏览(740)

1、启动集群

  1. 启动zookeeper,master,kafka,flume
  2. # 1、三个节点
  3. /usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh start
  4. /usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh status
  5. # 2、master节点 启动hadoop
  6. /usr/hadoop/hadoop-2.7.3/sbin/start-all.sh
  7. # 3、启动kafka(三个节点)
  8. cd /usr/kafka/kafka_2.11-2.4.0/
  9. bin/kafka-server-start.sh config/server.properties

当我们启动了zookeeper、hadoop、kafka。

kafka的安装配置可参考链接:Kafka集群分布式部署与测试

2、kafka创建topic

  1. # 创建topic--badou_data
  2. kafka-topics.sh --create --topic badou_data --partitions 3 --replication-factor 2 --zookeeper master:2181,slave1:2181,slave2:2181
  3. # 消费badou_data
  4. ./bin/kafka-console-consumer.sh --from-beginning --topic badou_data --bootstrap-server master:9092,slave1:9092,slave2:9092

3、编辑conf

cd /usr/flume/flume-1.7.0
vi conf/flume_kafka.conf

  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = exec
  7. a1.sources.r1.command = tail -f /usr/flume/flume-1.7.0/day6/flume_exec_test.txt
  8. # a1.sinks.k1.type = logger
  9. # 设置kafka接收器
  10. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  11. # 设置kafka的broker地址和端口号
  12. a1.sinks.k1.brokerList=master:9092
  13. # 设置Kafka的topic
  14. a1.sinks.k1.topic=badou_data
  15. # 设置序列化的方式
  16. a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
  17. # use a channel which buffers events in memory
  18. a1.channels.c1.type=memory
  19. a1.channels.c1.capacity = 100000
  20. a1.channels.c1.transactionCapacity = 1000
  21. # Bind the source and sink to the channel
  22. a1.sources.r1.channels=c1
  23. a1.sinks.k1.channel=c1

设置接受sink地址master:9092,启动Flume。

  1. cd /usr/flume/flume-1.7.0
  2. ./bin/flume-ng agent --conf conf --conf-file ./conf/flume_kafka.conf -name a1 -Dflume.root.logger=INFO,console

4、清空日志文件、并执行python

  1. cd /usr/flume/flume-1.7.0/day6
  2. echo '' > flume_exec_test.txt

执行python flume_data_write.py,模拟将后端日志写入到日志文件中 python flume_data_write.py

  1. import random
  2. import time
  3. import pandas as pd
  4. import json
  5. writeFileName="/usr/flume/flume-1.7.0/day6/flume_exec_test.txt"
  6. cols = ["order_id","user_id","eval_set","order_number","order_dow","hour","day"]
  7. df1 = pd.read_csv('/root/day3/orders.csv')
  8. df1.columns = cols
  9. df = df1.fillna(0)
  10. with open(writeFileName,'a+') as wf:
  11. for idx,row in df.iterrows():
  12. d = {}
  13. for col in cols:
  14. d[col]=row[col]
  15. js = json.dumps(d)
  16. wf.write(js+'\n')

我们会发现,python的数据源源从kafka消费到 flume_exec_test.txt。

相关文章

最新文章

更多