配置Flumeelasticsearch apacheFlume

xggvc2p6  于 2021-06-04  发布在  Flume
关注(0)|答案(3)|浏览(441)

这是我第一次来这里,所以很抱歉如果我不张贴罚款,并为我的英语不好抱歉。
我正在尝试配置apache flume和elasticsearch接收器。一切正常,似乎运行正常,但启动代理时有两个警告;以下是:

2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies

我的代理配置:


# Name the components on this agent

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink ES

a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

它启动了netcat,一切正常,但是我担心这些警告,我不明白。

qcbq4gxm

qcbq4gxm1#

在处理日志时,存在缺少依赖项的问题。
如果你看一下 ElasticSearchSink 文档中,您将看到以下内容:

环境所需的elasticsearch和lucene核心jar必须放在apacheflume安装的lib目录中。elasticsearch要求客户机jar的主版本与服务器的主版本匹配,并且两者运行的jvm的次版本相同。如果这不正确,将出现序列化异常。要选择所需的版本,请首先确定elasticsearch的版本和目标集群正在运行的jvm版本。然后选择与主要版本匹配的elasticsearch客户端库。一个0.19.x客户端可以与一个0.19.x集群通信;0.20.x可以和0.20.x通话,0.90.x可以和0.90.x通话。确定elasticsearch版本后,读取pom.xml文件以确定要使用的正确lucene core jar版本。运行elasticsearchsink的flume代理也应该匹配目标集群运行的jvm到次要版本。

很可能您没有放置所需的javajar,或者版本不合适。

pkwftd7m

pkwftd7m2#

仅在flume/lib dir中添加了以下2个jar,并且有效,不必添加所有其他lucenejar:
ElasticSearch-1.7.1.jar
lucene-core-4.10.4.jar
启动Flume的命令:

bin/flume-ng agent --conf conf --conf-file conf/flume-aggregator.conf --name agent2 -Dflume.root.logger=INFO,console

确保将下面的内容添加到flume-env.sh中

export JAVA_HOME=/usr/java/default

export JAVA_OPTS="-Xms3072m -Xmx3072m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

FLUME_CLASSPATH="/usr/flume/flume1.6/apache-flume-1.6.0-bin/;/usr/flume/flume1.6/apache-flume-1.6.0-bin/lib"

flume aggregator config加载es中的数据:flume-aggregator.conf

agent2.sources = source1
agent2.sinks = sink1
agent2.channels = channel1

################################################ 

# Describe Source

################################################ 

# Source Avro

agent2.sources.source1.type = avro
agent2.sources.source1.bind = 0.0.0.0 
agent2.sources.source1.port = 9997

################################################ 

# Describe Interceptors

################################################ 

# an example of nginx access log regex match

# agent2.sources.source1.interceptors = interceptor1

# agent2.sources.source1.interceptors.interceptor1.type = regex_extractor

# 

# agent2.sources.source1.interceptors.interceptor1.regex = "^(\\S+) \\[(.*?)\\] \"(.*?)\" (\\S+) (\\S+)( \"(.*?)\" \"(.*?)\")?"

# 

# # agent2.sources.source1.interceptors.interceptor1.regex = ^(.*) ([a-zA-Z\\.\\@\\-\\+_%]+) ([a-zA-Z\\.\\@\\-\\+_%]+) \\[(.*)\\] \\"(POST|GET) ([A-Za-z0-9\\$\\.\\+\\@#%_\\/\\-]*)\\??(.*) (.*)\\" ([a-zA-Z0-9\\.\\/\\s\-]*) (.*) ([0-9]+) ([0-9]+) ([0-9\\.]+)

# # agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13

# 

# agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8

# agent2.sources.source1.interceptors.interceptor1.serializers.s1.name = clientip

# agent2.sources.source1.interceptors.interceptor1.serializers.s2.name = datetime

# agent2.sources.source1.interceptors.interceptor1.serializers.s3.name = method

# agent2.sources.source1.interceptors.interceptor1.serializers.s4.name = request

# agent2.sources.source1.interceptors.interceptor1.serializers.s5.name = response

# agent2.sources.source1.interceptors.interceptor1.serializers.s6.name = status

# agent2.sources.source1.interceptors.interceptor1.serializers.s7.name = bytes

# agent2.sources.source1.interceptors.interceptor1.serializers.s8.name = requesttime

# 

################################################ 

# Describe Sink

################################################ 

# Sink ElasticSearch

# Elasticsearch lib ---> flume/lib

# elasticsearch/config/elasticsearch.yml cluster.name clusterName. data/clustername data.

agent2.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent2.sinks.sink1.hostNames = 10.20.156.16:9300,10.20.176.20:9300
agent2.sinks.sink1.indexName = pdi
agent2.sinks.sink1.indexType = pdi_metrics
agent2.sinks.sink1.clusterName = My-ES-CLUSTER
agent2.sinks.sink1.batchSize = 1000
agent2.sinks.sink1.ttl = 2

# this serializer is crucial in order to use kibana

agent2.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer

################################################ 

# Describe Channel

################################################ 

# Channel Memory

agent2.channels.channel1.type = memory
agent2.channels.channel1.capacity = 10000000
agent2.channels.channel1.transactionCapacity = 1000

################################################ 

# Bind the source and sink to the channel

################################################ 

agent2.sources.source1.channels = channel1
agent2.sinks.sink1.channel = channel1
acruukt9

acruukt93#

我找到了一个原因,似乎ApacheFlume1.6.0和ElasticSearch2.0不能正确地通信。
我从我修改过的第三个人那里找到了一个很好的Flume。
这是链接
这是我的最终配置,


# Name the components on this agent

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink ES

a1.sinks = k1
a1.sinks.k1.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink
a1.sinks.k1.hostNames = 127.0.0.1:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer
a1.sinks.k1.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这对我很有用。
谢谢你的回答。
p、 是的,我不得不搬走图书馆。

相关问题