flink streaming中的获取错误“java.lang.nosuchfielderror:lucene\u 4\u 10\u 4”

332nm8kg  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(339)

我正在尝试使用代码为我的数据流添加elasticsearch作为接收器

  1. val config = new HashMap[String, String]
  2. config.put("bulk.flush.max.actions", "1")
  3. config.put("cluster.name", "cluster")
  4. val transports = new ArrayList[InetSocketAddress]
  5. transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
  6. // testing simple setup
  7. val input:DataStream[String] = timedStream.map( _.language.toString)
  8. input.print()
  9. input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction[String] {
  10. def createIndexRequest(element: String): IndexRequest = {
  11. val json = new java.util.HashMap[String, AnyRef]
  12. // Map stream fields to JSON properties, format:
  13. // json.put("json-property-name", streamField)
  14. json.put("data", element)
  15. Requests.indexRequest.index("test").`type`("test").source(json)
  16. }
  17. override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
  18. indexer.add(createIndexRequest(element))
  19. }
  20. }))

我的pom文件依赖关系是

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-core</artifactId>
  5. <version>1.3.2</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-clients_2.10</artifactId>
  10. <version>1.3.2</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-java</artifactId>
  15. <version>1.3.2</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-streaming-java_2.10</artifactId>
  20. <version>1.3.2</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.flink</groupId>
  24. <artifactId>flink-streaming-scala_2.10</artifactId>
  25. <version>1.3.2</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-connector-twitter_2.10</artifactId>
  30. <version>1.3.2</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.flink</groupId>
  34. <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
  35. <version>1.3.2</version>
  36. </dependency>
  37. </dependencies>

另外,我的系统中有elasticsearch 2.4.6版,在执行程序的过程中,我遇到以下错误:

  1. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  2. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
  3. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
  4. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
  5. at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  6. at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  7. at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
  8. at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
  9. at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  10. at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  11. at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  12. at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  13. Caused by: java.lang.NoSuchFieldError: LUCENE_4_10_4
  14. at org.elasticsearch.Version.<clinit>(Version.java:230)
  15. at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:129)
  16. at org.apache.flink.streaming.connectors.elasticsearch2.Elasticsearch2ApiCallBridge.createClient(Elasticsearch2ApiCallBridge.java:65)
  17. at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:272)
  18. at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
  19. at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
  20. at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
  21. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
  22. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
  23. at java.lang.Thread.run(Thread.java:745)

问题出在哪里,我尝试过在pom中包含elasticsearch 2.4.6,但没有成功。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题