如何在sink失败后强制flume ng处理积压的事件?

idfiyjo8  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(385)

我正在尝试设置flume ng,从一堆服务器(主要运行tomcat示例和apachehttpd)收集各种日志,并将它们转储到5节点hadoop集群上的hdfs中。设置如下所示:

每个应用服务器都将相关的日志跟踪到一个exec源中(每个日志类型对应一个:java、httpd、syslog),然后通过filechannel将它们输出到avro接收器。在每台服务器上,不同的源、通道和接收器由一个代理管理。事件由驻留在hadoop集群(同时承载secondarynamenode和jobtracker的节点)上的avrosource获取。对于每个日志类型,都有一个avrosource监听不同的端口。事件通过filechannel进入hdfs接收器,hdfs接收器使用flumeeventavro事件序列化程序和snapy压缩保存事件。
问题是:hadoop节点上管理hdfs接收器(同样,每个日志类型一个)的代理在几个小时后失败,因为我们没有更改jvm的堆大小。从那时起,在该节点上的filechannel中收集了大量事件,之后在应用程序服务器上的filechannels中也收集了大量事件,因为hadoop节点上的filechannel达到了它的最大容量。当我修复这个问题时,我无法让hadoop节点上的代理足够快地处理积压工作,这样它就可以恢复正常操作。filechannel在下沉事件之前保存事件的tmp dir的大小一直在增长。而且,hdfs的写入速度似乎非常慢。有没有办法强制flume在接收新事件之前先处理积压工作?以下配置是否最佳?可能与此相关:写入hdfs的文件非常小,大约1-3mb左右。对于hdfs的默认块大小为64mb以及未来的mr操作,这肯定不是最佳的。我应该使用什么设置来收集文件中的事件,这些文件的大小足以容纳hdfs块大小?我感觉hadoop节点上的配置不对,我怀疑batchsize、rollcount和相关参数的值是关闭的,但我不确定最佳值应该是什么。

应用程序服务器上的配置示例:

agent.sources=syslogtail httpdtail javatail
agent.channels=tmpfile-syslog tmpfile-httpd tmpfile-java
agent.sinks=avrosink-syslog avrosink-httpd avrosink-java

agent.sources.syslogtail.type=exec
agent.sources.syslogtail.command=tail -F /var/log/messages
agent.sources.syslogtail.interceptors=ts
agent.sources.syslogtail.interceptors.ts.type=timestamp
agent.sources.syslogtail.channels=tmpfile-syslog
agent.sources.syslogtail.batchSize=1

...

agent.channels.tmpfile-syslog.type=file
agent.channels.tmpfile-syslog.checkpointDir=/tmp/flume/syslog/checkpoint
agent.channels.tmpfile-syslog.dataDirs=/tmp/flume/syslog/data

...

agent.sinks.avrosink-syslog.type=avro
agent.sinks.avrosink-syslog.channel=tmpfile-syslog
agent.sinks.avrosink-syslog.hostname=somehost
agent.sinks.avrosink-syslog.port=XXXXX
agent.sinks.avrosink-syslog.batch-size=1

hadoop节点上的配置示例

agent.sources=avrosource-httpd avrosource-syslog avrosource-java
agent.channels=tmpfile-httpd tmpfile-syslog tmpfile-java
agent.sinks=hdfssink-httpd hdfssink-syslog hdfssink-java

agent.sources.avrosource-java.type=avro
agent.sources.avrosource-java.channels=tmpfile-java
agent.sources.avrosource-java.bind=0.0.0.0
agent.sources.avrosource-java.port=XXXXX

...

agent.channels.tmpfile-java.type=file
agent.channels.tmpfile-java.checkpointDir=/tmp/flume/java/checkpoint
agent.channels.tmpfile-java.dataDirs=/tmp/flume/java/data
agent.channels.tmpfile-java.write-timeout=10
agent.channels.tmpfile-java.keepalive=5
agent.channels.tmpfile-java.capacity=2000000

...

agent.sinks.hdfssink-java.type=hdfs
agent.sinks.hdfssink-java.channel=tmpfile-java
agent.sinks.hdfssink-java.hdfs.path=/logs/java/avro/%Y%m%d/%H
agent.sinks.hdfssink-java.hdfs.filePrefix=java-
agent.sinks.hdfssink-java.hdfs.fileType=DataStream
agent.sinks.hdfssink-java.hdfs.rollInterval=300
agent.sinks.hdfssink-java.hdfs.rollSize=0
agent.sinks.hdfssink-java.hdfs.rollCount=40000
agent.sinks.hdfssink-java.hdfs.batchSize=20000
agent.sinks.hdfssink-java.hdfs.txnEventMax=20000
agent.sinks.hdfssink-java.hdfs.threadsPoolSize=100
agent.sinks.hdfssink-java.hdfs.rollTimerPoolSize=10
uqjltbpv

uqjltbpv1#

我在您的配置中看到了一些可能导致问题的因素:
您的第一个代理似乎有一个批处理大小为1的avro接收器。你应该把这个增加到至少100或更多。这是因为第二个代理上的avro源将提交到批大小为1的通道。每次提交都会导致fsync,从而导致文件通道性能较差。exec源上的批处理大小也是1,导致通道也变慢。您可以增加批处理大小(或者使用spool目录源-稍后详细介绍)。
您可以让多个hdfs接收器从同一通道读取数据,以提高性能。您应该确保每个接收器写入不同的目录或具有不同的“hdfs.fileprefix”,这样多个hdfs接收器就不会尝试写入相同的文件。
hdfs接收器的批处理大小是20000,这相当高,而calltimeout的默认值是10秒。如果要保持如此大的批处理大小,应该增加“hdfs.calltimeout”。我建议将批处理大小减少到1000左右,超时时间大约为15-20秒(请注意,在当前批处理大小下,每个文件只包含2个批处理—因此请减小批处理大小,增加rollinterval和timeout)
如果您使用的是tail-f,我建议您尝试使用新的spool目录源代码。要使用此源,请将日志文件旋转到spool目录源处理的目录。此源只处理不可变的文件,因此需要将日志文件转出。在exec源代码中使用tail-f有问题,如flume用户指南中所述。

相关问题