我有一个写日志文件的http服务器,然后使用flume将其加载到hdfs中。首先,我想根据头或正文中的数据过滤数据。我读到我可以用一个带有regex的拦截器来做这个,有人能确切地解释我需要做什么吗?我是否需要编写覆盖flume代码的java代码?另外,我想采取的数据,并根据标题发送到另一个接收器(即源=1去sink1和源=2去sink2)这是如何做到的?谢谢您,希蒙
u5i3ibmn1#
您不需要编写java代码来过滤事件。使用regex筛选拦截器筛选正文文本与某些正则表达式匹配的事件:
agent.sources.logs_source.interceptors = regex_filter_interceptor agent.sources.logs_source.interceptors.regex_filter_interceptor.type = regex_filter agent.sources.logs_source.interceptors.regex_filter_interceptor.regex = <your regex> agent.sources.logs_source.interceptors.regex_filter_interceptor.excludeEvents = true
要基于标头路由事件,请使用多路复用通道选择器:
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4
在这里,标题为“state”=“cz”的事件转到通道“c1”,标题为“state”=“us”的事件转到“c2”和“c3”,所有其他事件转到“c4”。这样,您还可以按头过滤事件—只需将特定的头值路由到指向空接收器的通道。
u2nhd7ah2#
您可以使用flume频道选择器来简单地将事件路由到不同的目的地。或者可以将几个flume代理链接在一起,以实现复杂的路由功能。但是链接的flume代理将变得有点难以维护(资源使用和flume拓扑)。你可以看看Flumeng路由器Flume,它可能提供一些你想要的功能。首先,通过flume拦截器在事件头中添加特定字段
a1.sources = r1 r2 a1.channels = c1 c2 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = datacenter a1.sources.r1.interceptors.i1.value = NEW_YORK a1.sources.r2.channels = c2 a1.sources.r2.type = seq a1.sources.r2.interceptors = i2 a1.sources.r2.interceptors.i2.type = static a1.sources.r2.interceptors.i2.key = datacenter a1.sources.r2.interceptors.i2.value = BERKELEY
然后,您可以设置Flume通道选择器,如下所示:
a2.sources = r2 a2.sources.channels = c1 c2 c3 c4 a2.sources.r2.selector.type = multiplexing a2.sources.r2.selector.header = datacenter a2.sources.r2.selector.mapping.NEW_YORK = c1 a2.sources.r2.selector.mapping.BERKELEY= c2 c3 a2.sources.r2.selector.default = c4
或者,您可以设置avro路由器接收器,如:
agent.sinks.routerSink.type = com.datums.stream.AvroRouterSink agent.sinks.routerSink.hostname = test_host agent.sinks.routerSink.port = 34541 agent.sinks.routerSink.channel = memoryChannel # Set sink name agent.sinks.routerSink.component.name = AvroRouterSink # Set header name for routing agent.sinks.routerSink.condition = datacenter # Set routing conditions agent.sinks.routerSink.conditions = east,west agent.sinks.routerSink.conditions.east.if = ^NEW_YORK agent.sinks.routerSink.conditions.east.then.hostname = east_host agent.sinks.routerSink.conditions.east.then.port = 34542 agent.sinks.routerSink.conditions.west.if = ^BERKELEY agent.sinks.routerSink.conditions.west.then.hostname = west_host agent.sinks.routerSink.conditions.west.then.port = 34543
2条答案
按热度按时间u5i3ibmn1#
您不需要编写java代码来过滤事件。使用regex筛选拦截器筛选正文文本与某些正则表达式匹配的事件:
要基于标头路由事件,请使用多路复用通道选择器:
在这里,标题为“state”=“cz”的事件转到通道“c1”,标题为“state”=“us”的事件转到“c2”和“c3”,所有其他事件转到“c4”。
这样,您还可以按头过滤事件—只需将特定的头值路由到指向空接收器的通道。
u2nhd7ah2#
您可以使用flume频道选择器来简单地将事件路由到不同的目的地。或者可以将几个flume代理链接在一起,以实现复杂的路由功能。但是链接的flume代理将变得有点难以维护(资源使用和flume拓扑)。你可以看看Flumeng路由器Flume,它可能提供一些你想要的功能。
首先,通过flume拦截器在事件头中添加特定字段
然后,您可以设置Flume通道选择器,如下所示:
或者,您可以设置avro路由器接收器,如: