目前我使用的是flume版本:1.5.2。
flume在hdfs中每个输出文件的末尾创建一个空行,这会导致行计数、文件大小和校验和与源文件和目标文件不匹配。
我试图重写参数roolsize、batchsize和appendnewline的默认值,但仍然不起作用。
同样,flume将eol从crlf(源文件)更改为lf(输出文件),这也会导致文件大小不同
下面是我正在使用的相关flume代理配置参数
agent1.sources = c1
agent1.sinks = c1s1
agent1.channels = ch1
agent1.sources.c1.type = spooldir
agent1.sources.c1.spoolDir = /home/biadmin/flume-test/sourcedata1
agent1.sources.c1.bufferMaxLineLength = 80000
agent1.sources.c1.channels = ch1
agent1.sources.c1.fileHeader = true
agent1.sources.c1.fileHeaderKey = file
#agent1.sources.c1.basenameHeader = true
#agent1.sources.c1.fileHeaderKey = basenameHeaderKey
#agent1.sources.c1.filePrefix = %{basename}
agent1.sources.c1.inputCharset = UTF-8
agent1.sources.c1.decodeErrorPolicy = IGNORE
agent1.sources.c1.deserializer= LINE
agent1.sources.c1.deserializer.maxLineLength = 50000
agent1.sources.c1.deserializer=
org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
agent1.sources.c1.interceptors = a b
agent1.sources.c1.interceptors.a.type =
org.apache.flume.interceptor.TimestampInterceptor$Builder
agent1.sources.c1.interceptors.b.type =
org.apache.flume.interceptor.HostInterceptor$Builder
agent1.sources.c1.interceptors.b.preserveExisting = false
agent1.sources.c1.interceptors.b.hostHeader = host
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000
agent1.channels.ch1.transactionCapacity = 1000
agent1.channels.ch1.batchSize = 1000
agent1.channels.ch1.maxFileSize = 2073741824
agent1.channels.ch1.keep-alive = 5
agent1.sinks.c1s1.type = hdfs
agent1.sinks.c1s1.hdfs.path = hdfs://bivm.ibm.com:9000/user/biadmin/
flume/%y-%m-%d/%H%M
agent1.sinks.c1s1.hdfs.fileType = DataStream
agent1.sinks.c1s1.hdfs.filePrefix = %{file}
agent1.sinks.c1s1.hdfs.fileSuffix =.csv
agent1.sinks.c1s1.hdfs.writeFormat = Text
agent1.sinks.c1s1.hdfs.maxOpenFiles = 10
agent1.sinks.c1s1.hdfs.rollSize = 67000000
agent1.sinks.c1s1.hdfs.rollCount = 0
# agent1.sinks.c1s1.hdfs.rollInterval = 0
agent1.sinks.c1s1.hdfs.batchSize = 1000
agent1.sinks.c1s1.channel = ch1
# agent1.sinks.c1s1.hdfs.codeC = snappyCodec
agent1.sinks.c1s1.hdfs.serializer = text
agent1.sinks.c1s1.hdfs.serializer.appendNewline = false
hdfs.serializer.appendnewline未修复此问题。
谁能检查一下并提出建议吗。。
2条答案
按热度按时间aoyhnmkz1#
替换
具有
不同之处在于序列化程序设置不是在hdfs前缀上设置的,而是直接在接收器名称上设置的。
flume文档应该有一些这样的例子,因为我也遇到了一些问题,因为我没有发现序列化程序设置在不同的属性名级别上。
有关hdfs接收器的更多信息,请参见:https://flume.apache.org/flumeuserguide.html#hdfs-Flume
sq1bmfud2#
更换flume agent中的下面一行。
用下面这句话告诉我进展如何。