如何在flume中同时使用regex\u提取器选择器和多路复用拦截器?

fruv7luv  于 2021-06-03  发布在  Flume
关注(0)|答案(1)|浏览(377)

我正在测试flume将数据加载到hhase中,并考虑使用flume的选择器和interceptor进行并行数据加载,因为源和汇之间存在速度差。
所以,我想用Flume做的是
使用拦截器的regex\u提取器类型创建事件头
将具有标头的事件多路传输到具有选择器多路传输类型的两个以上通道
在一个源-通道-接收器中。
并尝试如下配置。

agent.sources = tailsrc
    agent.channels = mem1 mem2
    agent.sinks = std1 std2
    agent.sources.tailsrc.type = exec
    agent.sources.tailsrc.command = tail -F /home/flumeuser/test/in.txt
    agent.sources.tailsrc.batchSize = 1

    agent.sources.tailsrc.interceptors = i1
    agent.sources.tailsrc.interceptors.i1.type = regex_extractor
    agent.sources.tailsrc.interceptors.i1.regex = ^(\\d)
    agent.sources.tailsrc.interceptors.i1.serializers = t1
    agent.sources.tailsrc.interceptors.i1.serializers.t1.name = type

    agent.sources.tailsrc.selector.type = multiplexing
    agent.sources.tailsrc.selector.header = type
    agent.sources.tailsrc.selector.mapping.1 = mem1
    agent.sources.tailsrc.selector.mapping.2 = mem2

    agent.sinks.std1.type = file_roll
    agent.sinks.std1.channel = mem1
    agent.sinks.std1.batchSize = 1
    agent.sinks.std1.sink.directory = /var/log/flumeout/1
    agent.sinks.std1.rollInterval = 0

    agent.sinks.std2.type = file_roll
    agent.sinks.std2.channel = mem2
    agent.sinks.std2.batchSize = 1
    agent.sinks.std2.sink.directory = /var/log/flumeout/2
    agent.sinks.std2.rollInterval = 0

    agent.channels.mem1.type = memory
    agent.channels.mem1.capacity = 100

    agent.channels.mem2.type = memory
    agent.channels.mem2.capacity = 100

但是,它不起作用!
当选择器部分被移除时,flume的日志中有一些拦截器调试消息。但当选择器和拦截器在一起时,就什么都没有了。
有没有什么错误的表达或者我遗漏了什么?
感谢阅读。:)

rbpvctlc

rbpvctlc1#

我找到了。
Flume日志中有如下警告信息。

2013-10-10 16:34:20,514 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:571)] Removed tailsrc due to Failed to configure component!

所以我把它附在下面

agent.sources.tailsrc.channels = mem1 mem2

然后它就工作了!!!!

相关问题