flume1.6spooldir源代码只接收文件的一部分

9udxz4iz  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(274)

谢谢你看这个
我正在尝试使用flume-1.6收集日志。但我发现并不是所有的日志文件都是使用假脱机控制器源代码摄取的。请给出你的建议!!
在一个测试中,我在日志文件(170m)中有369189行,但在另一端只收到169335行。当我检查flume.log时,它会说已经到达文件的末尾,并将.completed添加到原始日志文件中。
我尝试使用不同的日志文件,大约有300000行,另一端接收52410条记录。
以下是背景和配置:
日志文件大小约为200m。
flume配置了spooldir源、文件通道和kafka接收器,如下所示:


# agent definition

log_agent.sources = spooldirSrc 
log_agent.channels = fileChannel 
log_agent.sinks = kafkaSink 

log_agent.sources.spooldirSrc.channels = fileChannel
log_agent.sinks.kafkaSink.channel = fileChannel

# source define

log_agent.sources.spooldirSrc.type = spooldir
log_agent.sources.spooldirSrc.spoolDir=/log_path/

# kafkaSink definition

log_agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
log_agent.sinks.kafkaSink.topic=log-topic
log_agent.sinks.kafkaSink.brokerList=kafka-host-1:9092,kafka-host-1:9092,kafka-host-1:9092
log_agent.sinks.kafkaSink.requiredAcks=1
log_agent.sinks.kafkaSink.batchSize=100

# fileChannel definition

log_agent.channels.fileChannel.type=file
log_agent.channels.fileChannel.checkpointDir=/path/checkpoint/
log_agent.channels.fileChannel.dataDirs=/path/data
log_agent.channels.fileChannel.capacity=100000

我读过flume文档,spooldir源默认使用行反序列化器。我下载了flume-1.6源代码,并在文件读取的末尾添加了打印行到reliablespoolingfileeventreader,后者负责从日志中读取。似乎读者在到达eof之前就过早地结束了。
任何建议都将不胜感激!

mctunoxg

mctunoxg1#

最后,我找到了原因。linedeserializer依赖resettablefileinputstream从文件中读取。resettablefileinputstream在读取多码点unicode字符时出现问题。在flume1.7中解决了这个问题。
我用flume1.7的源代码替换了flume1.6 resettablefileinputstream,重新编译并替换了flume1.6/lib中的flume-ng-core-1.6.0.jar。然后spooldir源可以使用文件的所有内容。
希望这能对遇到同样怪诞问题的人有所帮助。
请参考apache发布站点。

相关问题