flume hdfs sink使用netcat源只存储一行数据源

quhf5bfb  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(355)

我尝试使用Flume1.7将数据加载到hdfs中。我创建了以下配置:


# Starting with: /opt/flume/bin/flume-ng agent -n Agent -c conf -f /opt/flume/conf/test.conf

# Naming the components on the current agent

Agent.sources = Netcat   
Agent.channels = MemChannel 
Agent.sinks = LoggerSink hdfs-sink LocalOut

# Describing/Configuring the source

Agent.sources.Netcat.type = netcat 
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565  

# Describing/Configuring the sink

Agent.sinks.LoggerSink.type = logger  

# Define a sink that outputs to hdfs.

Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>:8020/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 0
Agent.sinks.hdfs-sink.hdfs.rollInterval = 0
Agent.sinks.hdfs-sink.hdfs.idleTimeout = 0

# Schreibt input into local Filesystem

# http://flume.apache.org/FlumeUserGuide.html#file-roll-sink

Agent.sinks.LocalOut.type = file_roll  
Agent.sinks.LocalOut.sink.directory = /tmp/flume
Agent.sinks.LocalOut.sink.rollInterval = 0  

# Describing/Configuring the channel

Agent.channels.MemChannel.type = memory 
Agent.channels.MemChannel.capacity = 1000 
Agent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel

Agent.sources.Netcat.channels = MemChannel
Agent.sinks.LoggerSink.channel = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel
Agent.sinks.LocalOut.channel = MemChannel

之后,我使用netcat将以下文件发送到源:

cat textfile.csv | nc <IP of flume agent> 56565

该文件包含以下元素之一:

Name1,1
Name2,2
Name3,3
Name4,4
Name5,5
Name6,6
Name7,7
Name8,8
Name9,9
Name10,10
Name11,11
Name12,12
Name13,13
Name14,14
Name15,15
Name16,16
Name17,17
Name18,18
Name19,19
Name20,20
...
Name490,490
Name491,491
Name492,492

我面临的问题是,如果没有任何错误,flume将写入hdfs,但只传输文件的一行。如果您开始使用nectat多次将文件推送到源,那么有时flume会将多个文件写入hdfs,包括多行。但很少是所有的行。
我尝试更改rollsize、batch size和其他的hdfs参数,但实际上并没有改变行为。
同样配置的sink-to-local文件工作正常。
有人知道如何配置它以确保所有条目都写入hdfs而不会丢失条目吗。
谢谢你的帮助。
更新1.12.2016
我移除了除hdfs的接收器之外的所有接收器,并更改了一些参数。在此之后,hdfs接收器将按其应有的方式运行。
配置如下:


# Naming the components on the current agent

Agent.sources = Netcat   
Agent.channels = MemChannel 
Agent.sinks = hdfs-sink 

# Describing/Configuring the source

Agent.sources.Netcat.type = netcat 
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565  

# Define a sink that outputs to hdfs.

Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 100

# Describing/Configuring the channel

Agent.channels.MemChannel.type = memory 
Agent.channels.MemChannel.capacity = 1000 
Agent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel

Agent.sources.Netcat.channels = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel

有人知道为什么它在使用这种配置,但有两个或更多的接收器,它不再工作了吗?

3lxsmp7m

3lxsmp7m1#

我自己找到了解决办法。据我所知,我对两个接收器使用了相同的通道。因此,更快的接收器接管了所有条目,只有部分条目被传递到hdfs接收器。
在使用不同的通道和包括扇形的参数源

Agent.sources.Netcat.selector.type = replicating

flume按预期写入本地文件和hdfs。

相关问题