我遵循这个示例,尝试使用jest作为客户端将数据放入aws elasticsearch,并使用scala 2.11
libraryDependencies ++= Seq(
"org.apache.flink" % "flink-core" % "1.3.2",
"org.apache.flink" % "flink-scala_2.11" % "1.3.2",
"org.apache.flink" % "flink-streaming-scala_2.11" % "1.3.2",
"org.apache.flink" % "flink-shaded-hadoop2" % "1.4.0",
"org.apache.flink" %% "flink-yarn" % "1.3.2",
"com.typesafe.play" %% "play-json" % "2.6.2",
"org.apache.flink" % "flink-clients_2.11" % "1.3.2",
"org.apache.flink" %% "flink-avro" % "1.3.2",
"org.apache.flink" %% "flink-connector-filesystem" % "1.3.2",
"org.apache.flink" %% "flink-statebackend-rocksdb" % "1.3.2",
"io.searchbox" % "jest" % "5.3.3",
"com.amazonaws" % "aws-java-sdk" % "1.11.113",
"vc.inreach.aws" % "aws-signing-request-interceptor" % "0.0.20"
)
但是,我得到了以下错误:
java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
at com.google.common.reflect.TypeCapture.capture(TypeCapture.java:32)
at com.google.common.reflect.TypeToken.<init>(TypeToken.java:117)
at io.searchbox.core.Bulk$1.<init>(Bulk.java:96)
at io.searchbox.core.Bulk.getData(Bulk.java:96)
at io.searchbox.client.http.JestHttpClient.prepareRequest(JestHttpClient.java:116)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:64)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:60)
at ElasticsearchJestSink.flushDocumentBuffer(ElasticsearchJestSink.java:96)
at ElasticsearchJestSink.invoke(ElasticsearchJestSink.java:81)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at EventTimeJoinFunction$$anonfun$processElement2$1.apply(EventTimeJoinFunction.scala:54)
at EventTimeJoinFunction$$anonfun$processElement2$1.apply(EventTimeJoinFunction.scala:53)
at scala.collection.immutable.List.foreach(List.scala:381)
at EventTimeJoinFunction.processElement2(EventTimeJoinFunction.scala:53)
at EventTimeJoinFunction.processElement2(EventTimeJoinFunction.scala:15)
at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement2(KeyedCoProcessOperator.java:85)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:269)
at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
我也做了阴影规则,但它不起作用
assemblyShadeRules in assembly := Seq(
ShadeRule.rename("com.google.common.**" -> "shade.com.google.common.@1")
.inLibrary("com.google.guava" % "guava" % "21.0")
.inProject
)
我认为jest使用的是guava21.0,它比其他问题中建议的20.0要高,但是它仍然得到相同的错误。
暂无答案!
目前还没有任何答案,快来回答吧!