我正在尝试使用代码为我的数据流添加elasticsearch作为接收器
val config = new HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "cluster")
val transports = new ArrayList[InetSocketAddress]
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
// testing simple setup
val input:DataStream[String] = timedStream.map( _.language.toString)
input.print()
input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, AnyRef]
// Map stream fields to JSON properties, format:
// json.put("json-property-name", streamField)
json.put("data", element)
Requests.indexRequest.index("test").`type`("test").source(json)
}
override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
indexer.add(createIndexRequest(element))
}
}))
我的pom文件依赖关系是
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.10</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_2.10</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
另外,我的系统中有elasticsearch 2.4.6版,在执行程序的过程中,我遇到以下错误:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoSuchFieldError: LUCENE_4_10_4
at org.elasticsearch.Version.<clinit>(Version.java:230)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:129)
at org.apache.flink.streaming.connectors.elasticsearch2.Elasticsearch2ApiCallBridge.createClient(Elasticsearch2ApiCallBridge.java:65)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:272)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
问题出在哪里,我尝试过在pom中包含elasticsearch 2.4.6,但没有成功。
暂无答案!
目前还没有任何答案,快来回答吧!