如何使用flume将数据实时写入hdfs?

o4tp2gmn  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(523)

我使用flume在hdfs中存储传感器数据。一旦通过mqtt接收到数据。订阅者以json格式将数据发布到flume http侦听器。它目前工作正常,但问题是flume在停止hdfs文件之前(或者文件大小达到128mb)不会写入hdfs文件。我正在使用配置单元在读取时应用架构。不幸的是,生成的配置单元表只包含1个条目。这是正常的,因为flume没有将新的数据写入文件(由hive加载)。
有没有办法强迫flume以近乎实时的方式将新的数据写入hdfs?所以,我不需要重新启动它或使用小文件?
这是我的Flume配置:


# Name the components on this agent

emsFlumeAgent.sources = http_emsFlumeAgent
emsFlumeAgent.sinks = hdfs_sink
emsFlumeAgent.channels = channel_hdfs

# Describe/configure the source

emsFlumeAgent.sources.http_emsFlumeAgent.type = http
emsFlumeAgent.sources.http_emsFlumeAgent.bind = localhost
emsFlumeAgent.sources.http_emsFlumeAgent.port = 41414

# Describe the sink

emsFlumeAgent.sinks.hdfs_sink.type = hdfs
emsFlumeAgent.sinks.hdfs_sink.hdfs.path = hdfs://localhost:9000/EMS/%{sensor}
emsFlumeAgent.sinks.hdfs_sink.hdfs.rollInterval = 0
emsFlumeAgent.sinks.hdfs_sink.hdfs.rollSize = 134217728
emsFlumeAgent.sinks.hdfs_sink.hdfs.rollCount=0

# emsFlumeAgent.sinks.hdfs_sink.hdfs.idleTimeout=20

# Use a channel which buffers events in memory

emsFlumeAgent.channels.channel_hdfs.type = memory
emsFlumeAgent.channels.channel_hdfs.capacity = 10000
emsFlumeAgent.channels.channel_hdfs.transactionCapacity = 100

# Bind the source and sinks to the channel

emsFlumeAgent.sources.http_emsFlumeAgent.channels = channel_hdfs 
emsFlumeAgent.sinks.hdfs_sink.channel = channel_hdfs
hs1ihplo

hs1ihplo1#

我认为这里的棘手之处在于,您希望以近乎实时的方式将数据写入hdfs,但也不希望使用小文件(出于明显的原因),这可能是一件很难实现的事情。
您需要在以下两个参数之间找到最佳平衡: hdfs.rollSize (Default = 1024) -触发滚动的文件大小,以字节为单位(0:从不根据文件大小滚动)
hdfs.batchSize (Default = 100) -在刷新到hdfs之前写入文件的事件数
如果您的数据不太可能在首选的持续时间内达到128 mb,那么您可能需要减少 rollSize 但前提是你不会遇到小文件的问题。
由于您没有在hdfs接收器中设置任何批处理大小,因此每100条记录刷新一次hdfs应该会看到刷新的结果,但一旦刷新的记录的大小共同达到128mb,内容将被卷起到128mb文件中。这也没有发生吗?你能确认一下吗?
希望这有帮助!

相关问题