使用拦截器过滤flume中的日志文件

ghhaqwfi  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(412)

我有一个写日志文件的http服务器,然后使用flume将其加载到hdfs中。首先,我想根据头或正文中的数据过滤数据。我读到我可以用一个带有regex的拦截器来做这个,有人能确切地解释我需要做什么吗?我是否需要编写覆盖flume代码的java代码?
另外,我想采取的数据,并根据标题发送到另一个接收器(即源=1去sink1和源=2去sink2)这是如何做到的?
谢谢您,
希蒙

u5i3ibmn

u5i3ibmn1#

您不需要编写java代码来过滤事件。使用regex筛选拦截器筛选正文文本与某些正则表达式匹配的事件:

agent.sources.logs_source.interceptors = regex_filter_interceptor
agent.sources.logs_source.interceptors.regex_filter_interceptor.type = regex_filter
agent.sources.logs_source.interceptors.regex_filter_interceptor.regex = <your regex>
agent.sources.logs_source.interceptors.regex_filter_interceptor.excludeEvents = true

要基于标头路由事件,请使用多路复用通道选择器:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

在这里,标题为“state”=“cz”的事件转到通道“c1”,标题为“state”=“us”的事件转到“c2”和“c3”,所有其他事件转到“c4”。
这样,您还可以按头过滤事件—只需将特定的头值路由到指向空接收器的通道。

u2nhd7ah

u2nhd7ah2#

您可以使用flume频道选择器来简单地将事件路由到不同的目的地。或者可以将几个flume代理链接在一起,以实现复杂的路由功能。但是链接的flume代理将变得有点难以维护(资源使用和flume拓扑)。你可以看看Flumeng路由器Flume,它可能提供一些你想要的功能。
首先,通过flume拦截器在事件头中添加特定字段

a1.sources = r1 r2
a1.channels = c1 c2
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
a1.sources.r2.channels =  c2
a1.sources.r2.type = seq
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = datacenter
a1.sources.r2.interceptors.i2.value = BERKELEY

然后,您可以设置Flume通道选择器,如下所示:

a2.sources = r2
a2.sources.channels = c1 c2 c3 c4
a2.sources.r2.selector.type = multiplexing
a2.sources.r2.selector.header = datacenter
a2.sources.r2.selector.mapping.NEW_YORK = c1
a2.sources.r2.selector.mapping.BERKELEY= c2 c3
a2.sources.r2.selector.default = c4

或者,您可以设置avro路由器接收器,如:

agent.sinks.routerSink.type = com.datums.stream.AvroRouterSink
agent.sinks.routerSink.hostname = test_host
agent.sinks.routerSink.port = 34541
agent.sinks.routerSink.channel = memoryChannel

# Set sink name

agent.sinks.routerSink.component.name = AvroRouterSink

# Set header name for routing

agent.sinks.routerSink.condition = datacenter

# Set routing conditions

agent.sinks.routerSink.conditions = east,west
agent.sinks.routerSink.conditions.east.if = ^NEW_YORK
agent.sinks.routerSink.conditions.east.then.hostname = east_host
agent.sinks.routerSink.conditions.east.then.port = 34542
agent.sinks.routerSink.conditions.west.if = ^BERKELEY
agent.sinks.routerSink.conditions.west.then.hostname = west_host
agent.sinks.routerSink.conditions.west.then.port = 34543

相关问题