这是我第一次来这里,所以很抱歉如果我不张贴罚款,并为我的英语不好抱歉。
我正在尝试配置apache flume和elasticsearch接收器。一切正常,似乎运行正常,但启动代理时有两个警告;以下是:
2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies
我的代理配置:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
它启动了netcat,一切正常,但是我担心这些警告,我不明白。
3条答案
按热度按时间qcbq4gxm1#
在处理日志时,存在缺少依赖项的问题。
如果你看一下
ElasticSearchSink
文档中,您将看到以下内容:环境所需的elasticsearch和lucene核心jar必须放在apacheflume安装的lib目录中。elasticsearch要求客户机jar的主版本与服务器的主版本匹配,并且两者运行的jvm的次版本相同。如果这不正确,将出现序列化异常。要选择所需的版本,请首先确定elasticsearch的版本和目标集群正在运行的jvm版本。然后选择与主要版本匹配的elasticsearch客户端库。一个0.19.x客户端可以与一个0.19.x集群通信;0.20.x可以和0.20.x通话,0.90.x可以和0.90.x通话。确定elasticsearch版本后,读取pom.xml文件以确定要使用的正确lucene core jar版本。运行elasticsearchsink的flume代理也应该匹配目标集群运行的jvm到次要版本。
很可能您没有放置所需的javajar,或者版本不合适。
pkwftd7m2#
仅在flume/lib dir中添加了以下2个jar,并且有效,不必添加所有其他lucenejar:
ElasticSearch-1.7.1.jar
lucene-core-4.10.4.jar
启动Flume的命令:
确保将下面的内容添加到flume-env.sh中
flume aggregator config加载es中的数据:flume-aggregator.conf
acruukt93#
我找到了一个原因,似乎ApacheFlume1.6.0和ElasticSearch2.0不能正确地通信。
我从我修改过的第三个人那里找到了一个很好的Flume。
这是链接
这是我的最终配置,
这对我很有用。
谢谢你的回答。
p、 是的,我不得不搬走图书馆。