谢谢你看这个
我正在尝试使用flume-1.6收集日志。但我发现并不是所有的日志文件都是使用假脱机控制器源代码摄取的。请给出你的建议!!
在一个测试中,我在日志文件(170m)中有369189行,但在另一端只收到169335行。当我检查flume.log时,它会说已经到达文件的末尾,并将.completed添加到原始日志文件中。
我尝试使用不同的日志文件,大约有300000行,另一端接收52410条记录。
以下是背景和配置:
日志文件大小约为200m。
flume配置了spooldir源、文件通道和kafka接收器,如下所示:
# agent definition
log_agent.sources = spooldirSrc
log_agent.channels = fileChannel
log_agent.sinks = kafkaSink
log_agent.sources.spooldirSrc.channels = fileChannel
log_agent.sinks.kafkaSink.channel = fileChannel
# source define
log_agent.sources.spooldirSrc.type = spooldir
log_agent.sources.spooldirSrc.spoolDir=/log_path/
# kafkaSink definition
log_agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
log_agent.sinks.kafkaSink.topic=log-topic
log_agent.sinks.kafkaSink.brokerList=kafka-host-1:9092,kafka-host-1:9092,kafka-host-1:9092
log_agent.sinks.kafkaSink.requiredAcks=1
log_agent.sinks.kafkaSink.batchSize=100
# fileChannel definition
log_agent.channels.fileChannel.type=file
log_agent.channels.fileChannel.checkpointDir=/path/checkpoint/
log_agent.channels.fileChannel.dataDirs=/path/data
log_agent.channels.fileChannel.capacity=100000
我读过flume文档,spooldir源默认使用行反序列化器。我下载了flume-1.6源代码,并在文件读取的末尾添加了打印行到reliablespoolingfileeventreader,后者负责从日志中读取。似乎读者在到达eof之前就过早地结束了。
任何建议都将不胜感激!
1条答案
按热度按时间mctunoxg1#
最后,我找到了原因。linedeserializer依赖resettablefileinputstream从文件中读取。resettablefileinputstream在读取多码点unicode字符时出现问题。在flume1.7中解决了这个问题。
我用flume1.7的源代码替换了flume1.6 resettablefileinputstream,重新编译并替换了flume1.6/lib中的flume-ng-core-1.6.0.jar。然后spooldir源可以使用文件的所有内容。
希望这能对遇到同样怪诞问题的人有所帮助。
请参考apache发布站点。