apache-flink:如何将流接收到google云存储文件系统

mfuanj7w  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(404)

我试着在google云存储文件系统中将一些数据流写入一个文件,如下所示(使用flink1.8和scala 2.11):

  1. data.addSink(new BucketingSink[(String, Int)]("gs://url-try/try.txt"))

但我´我面临以下错误:

  1. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  2. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
  3. at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
  4. at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
  5. at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
  6. Caused by: java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
  7. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:379)
  8. at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
  9. at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
  10. at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
  11. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
  12. at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
  13. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
  14. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
  15. at java.lang.Thread.run(Thread.java:748)
  16. Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
  17. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
  18. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1227)
  19. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
  20. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
  21. ... 8 more
  22. Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Cannot support file system for 'gs' via Hadoop, because Hadoop is not in the classpath, or some classes are missing from the classpath.
  23. at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:179)
  24. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
  25. ... 11 more
  26. Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hdfs/HdfsConfiguration
  27. at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:85)
  28. ... 12 more
  29. Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfiguration
  30. at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  31. at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  32. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
  33. at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  34. ... 13 more

我看到了一些关于这个的问题,我也看到了:env variables:-flink\u conf\u dir
文件flink-conf.yaml:-fs.hdfs.hadoopconf:src/main/resources/core-site.xml
core-site.xml:

  1. > <property>
  2. > <name>fs.gs.impl</name>
  3. > <value>com.google.cloud.hadoop.fs.gcs.
  4. > GoogleHadoopFileSystem</value>
  5. > <description>The FileSystem for gs: (GCS) uris.</description>
  6. > </property>
  7. > <property>
  8. > <name>fs.AbstractFileSystem.gs.impl</name>
  9. > <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
  10. > <description>The AbstractFileSystem for gs: (GCS)
  11. > uris.</description>

以下是我的pom依赖项:

  1. <dependencies>
  2. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
  3. <dependency>
  4. <groupId>org.apache.flink</groupId>
  5. <artifactId>flink-scala_2.11</artifactId>
  6. <version>1.8.0</version>
  7. </dependency>
  8. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
  9. <dependency>
  10. <groupId>org.apache.flink</groupId>
  11. <artifactId>flink-streaming-scala_2.11</artifactId>
  12. <version>1.8.0</version>
  13. </dependency>
  14. <!-- https://mvnrepository.com/artifact/commons-lang/commons-lang -->
  15. <dependency>
  16. <groupId>commons-lang</groupId>
  17. <artifactId>commons-lang</artifactId>
  18. <version>2.6</version>
  19. </dependency>
  20. <!-- https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector -->
  21. <dependency>
  22. <groupId>com.google.cloud.bigdataoss</groupId>
  23. <artifactId>gcs-connector</artifactId>
  24. <version>hadoop3-1.9.16</version>
  25. </dependency>
  26. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs -->
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-hadoop-fs</artifactId>
  30. <version>1.8.0</version>
  31. </dependency>
  32. <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->
  33. <dependency>
  34. <groupId>org.apache.hadoop</groupId>
  35. <artifactId>hadoop-core</artifactId>
  36. <version>1.2.1</version>
  37. </dependency>
  38. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-filesystem -->
  39. <dependency>
  40. <groupId>org.apache.flink</groupId>
  41. <artifactId>flink-connector-filesystem_2.11</artifactId>
  42. <version>1.8.0</version>
  43. </dependency>
  44. <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
  45. <dependency>
  46. <groupId>org.apache.hadoop</groupId>
  47. <artifactId>hadoop-hdfs</artifactId>
  48. <version>3.2.0</version>
  49. </dependency>
  50. <dependency>
  51. <groupId>com.google.cloud</groupId>
  52. <artifactId>google-cloud-storage</artifactId>
  53. <version>1.35.0</version>
  54. </dependency>
  55. </dependencies>

有什么帮助吗?

xesrikrc

xesrikrc1#

根据您发布的堆栈跟踪,我看到您在尝试使用flink和scala写入gcs容器时遇到问题。
所以有一个类似的帖子解决了这个问题,请检查一下。
如果你还有其他问题,请不要犹豫。

相关问题