flink-bucketingsink崩溃noclassdeffounderror:lorg/apache/hadoop/fs/filesystem

wbgh16ku  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(354)

flink的版本:1.4.0、1.4.1、1.4.2
当我尝试制作这个简单的flink应用程序时

  1. val env: StreamExecutionEnvironment =
  2. StreamExecutionEnvironment.getExecutionEnvironment
  3. env.fromElements("a", "b", "c").addSink(new BucketingSink[String]("file:///Users/joshlemer/projects/my-project/target/output"))

我得到以下运行时异常:

  1. Exception in thread "main" java.lang.NoClassDefFoundError: Lorg/apache/hadoop/fs/FileSystem;
  2. at java.lang.Class.getDeclaredFields0(Native Method)
  3. at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
  4. at java.lang.Class.getDeclaredFields(Class.java:1916)
  5. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:72)
  6. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1550)
  7. at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:184)
  8. at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1134)
  9. at org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1036)
  10. at com.company.project.Job$.run(Job.scala:52)
  11. at com.company.project.Job$.main(Job.scala:28)
  12. at com.company.project.Job.main(Job.scala)
  13. Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FileSystem
  14. at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  15. at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  16. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
  17. at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

即使我可以用 dataStream.writeAsText(...) .
我的build.sbt也很典型:

  1. val flinkVersion = "1.4.2"
  2. val flinkDependencies =
  3. Seq(
  4. "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  5. "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  6. "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion,
  7. "org.apache.flink" %% "flink-connector-kafka-0.11" % flinkVersion,
  8. "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
  9. "org.apache.flink" %% "flink-test-utils" % flinkVersion % "test",
  10. "org.apache.flink" % "flink-test-utils-junit" % flinkVersion % "test"
  11. )

额外的 idea.sbt 正如flink为intellij用户推荐的那样

  1. lazy val mainRunner = project.in(file("mainRunner")).dependsOn(RootProject(file("."))).settings(
  2. // we set all provided dependencies to none, so that they are included in the classpath of mainRunner
  3. libraryDependencies := (libraryDependencies in RootProject(file("."))).value.map{
  4. module =>
  5. if (module.configurations.equals(Some("provided"))) {
  6. module.copy(configurations = None)
  7. } else {
  8. module
  9. }
  10. }
  11. )

我用它来运行应用程序(mainrunner被设置为应用程序类路径)。
我很困惑为什么会发生这种情况,尤其是为什么程序包以“lorg”而不是“org”开头?
谢谢!

xqkwcwgp

xqkwcwgp1#

在1.4发行说明中:
从版本1.4开始,flink可以在类路径中不存在任何hadoop依赖项的情况下运行。除了在没有hadoop的情况下简单地运行之外,这使flink能够动态地使用类路径中可用的hadoop版本。
例如,您可以下载flink的hadoop免费版本,但可以使用它在任何受支持的yarn版本上运行,flink将动态地使用yarn的hadoop依赖项。
这也意味着,在使用hdfs连接器的情况下,比如bucketingsink或rollingsink,现在必须确保使用带有捆绑hadoop依赖项的flink发行版,或者确保在为应用程序构建jar文件时包含hadoop依赖项。

相关问题