如何在Flume1.7中编写自定义esFlume

nom7f22z  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(382)

在flume代理中,我从kafka主题收集元素,需要将它们插入es。但是,我需要在接收器中执行之前的消化过程,因此我需要编写一个自定义接收器,以便将数据从代理的通道传递到java消化模块(我已经编写了这个模块)。
有谁能和我分享一个定制Flume的模板,并作为参考?flumes官方网站对此没有做太多说明:启动flume代理时,代理的类路径中必须包含自定义接收器的类及其依赖项。自定义接收器的类型是其fqcn。https://flume.apache.org/flumeuserguide.html#custom-Flume
一旦定制接收器就绪,如何链接以下三个文件以使代理工作:
定制Flume
摄取jar(执行摄取过程的java模块)
flumeagent.属性
感谢您的反馈。我将继续添加信息,一旦我在这个任务的进展。

ht4b089n

ht4b089n1#

希望您正在尝试使用flume接收来自kafka(源)的事件,并将其转发到es(sink),其中已有一些数据处理逻辑。
基于这种理解,我建议你研究一下flume拦截器,它负责在发送到sink之前改变/过滤正在运行的事件。
因此,更改事件的所有业务逻辑都可以实现为自定义拦截器,并且应该配置为flume通道。
作为参考,您可以检查本机拦截器源代码已经可用。这可能会给你一个关于Flume拦截器框架的想法。
这是es接收器源代码
Flume配置示例

a1.sources = kafkaSource
a1.sinks = ES_Sink
a1.channels = channel1

a1.sources.kafkaSource.interceptors = i1
a1.sources.kafkaSource.interceptors.i1.type = org.apache.flume.interceptor.<Custom_Interceptor_name>$Builder

a1.sinks.ES_Sink.channel = channel1
a1.sinks.ES_Sink.type = elasticsearch
a1.sinks.ES_Sink.hostNames = 127.0.0.1:9200

相关问题