使用spool目录写入flume如何重命名文件

huwehgph  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(422)

我正在使用flume spool目录写入hdfs。这是我的密码


# initialize agent's source, channel and sink

agent.sources = test
agent.channels = memoryChannel
agent.sinks = flumeHDFS

# Setting the source to spool directory where the file exists

agent.sources.test.type = spooldir
agent.sources.test.spoolDir = /johir
agent.sources.test.fileHeader = false
agent.sources.test.fileSuffix = .COMPLETED

# Setting the channel to memory

agent.channels.memoryChannel.type = memory

# Max number of events stored in the memory channel

agent.channels.memoryChannel.capacity = 10000

# agent.channels.memoryChannel.batchSize = 15000

agent.channels.memoryChannel.transactioncapacity = 1000000

# Setting the sink to HDFS

agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.path =/user/root/
agent.sinks.flumeHDFS.hdfs.fileType = DataStream

# Write format can be text or writable

agent.sinks.flumeHDFS.hdfs.writeFormat = Text

# use a single csv file at a time

agent.sinks.flumeHDFS.hdfs.maxOpenFiles = 1

# rollover file based on maximum size of 10 MB

agent.sinks.flumeHDFS.hdfs.rollCount=0
agent.sinks.flumeHDFS.hdfs.rollInterval=0
agent.sinks.flumeHDFS.hdfs.rollSize = 1000000
agent.sinks.flumeHDFS.hdfs.batchSize =1000

# never rollover based on the number of events

agent.sinks.flumeHDFS.hdfs.rollCount = 0

# rollover file based on max time of 1 min

# agent.sinks.flumeHDFS.hdfs.rollInterval = 0

# agent.sinks.flumeHDFS.hdfs.idleTimeout = 600

# Connect source and sink with channel

agent.sources.test.channels = memoryChannel
agent.sinks.flumeHDFS.channel = memoryChannel

但问题是写入文件的数据被重命名为随机的tmp名称。如何将hdfs中的文件重命名为源目录中的原始文件名。例如,我有day1.txt,day2.txt,day3.txt文件。这些是两天的数据。我想把它们作为day1.txt,day2.txt,day3.txt保存在hdfs中。但是这三个文件被合并并存储在hdfs中 FlumeData.1464629158164.tmp 文件。有什么办法吗?

yrwegjxp

yrwegjxp1#

如果要保留原始文件名,应将文件名作为头附加到每个事件。
将basenameheader属性设置为true。这将创建一个带有basename键的头,除非使用basenameheaderkey属性设置为其他值。
使用hdfs.fileprefix属性使用basenameheader值设置文件名。
将以下属性添加到配置文件中。


# source properties

agent.sources.test.basenameHeader = true

# sink properties

agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.filePrefix = %{basename}

相关问题