flink在emr上写入s3

jhiyze9q  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(635)

我正在尝试使用emr和flink将一些输出写入s3。我使用的是Scala2.11.7、Flink1.3.2和EMR5.11。但是,我得到了以下错误:

java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:93)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:345)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:350)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
    at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:222)
    at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78)
    at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:61)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)

我的 build.sbt 看起来像这样:

libraryDependencies ++= Seq(
  "org.apache.flink" % "flink-core" % "1.3.2",
  "org.apache.flink" % "flink-scala_2.11" % "1.3.2",
  "org.apache.flink" % "flink-streaming-scala_2.11" % "1.3.2",
  "org.apache.flink" % "flink-shaded-hadoop2" % "1.3.2",
  "org.apache.flink" % "flink-clients_2.11" % "1.3.2",
  "org.apache.flink" %% "flink-avro" % "1.3.2",
  "org.apache.flink" %% "flink-connector-filesystem" % "1.3.2"
)

我也找到了这个帖子,但它没有解决这个问题:emr上s3的外部检查点
我只是把输出放到s3: input.writeAsText("s3://test/flink") . 如有任何建议,将不胜感激。

ig9co6j1

ig9co6j11#

不确定flink shaded hadoop和emr版本的良好组合。经过几轮尝试和失败后,我能够使用新版本的 flink-shaded-hadoop2 -- "org.apache.flink" % "flink-shaded-hadoop2" % "1.4.0"

iqxoj9l9

iqxoj9l92#

您的问题可能是由于emr/yarn/flink在您自己的类之前加载了一些库,这导致了nosuchmethoderror:加载的类不是您提供的类,而是emr提供的类。注意jobmanager/taskmanager日志中的类路径。一个解决方案是将您自己的jar放在flink lib目录中,以便在emr之前加载它们。

相关问题