如何在flink中为google云存储创建recoverablewriter

bq9c1y66  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(404)

我想使用google云存储来编写 DataStream 从我的流媒体工作使用 StreamingFileSink .
为此,我使用googlecloudstorageconnectorforhadoop作为 org.apache.hadoop.fs.FileSystem ,并已使用 HadoopFileSystem 作为 org.apache.flink.core.fs.FileSystem 这为flink Package 了hadoop文件系统类。
我在gradle文件中包含了以下依赖项: compile( "com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2" ) compile( "org.apache.flink:flink-connector-filesystem_2.11:1.6.0" ) provided( "org.apache.flink:flink-shaded-hadoop2:1.6.0" ) 现在,根据我对源代码[1][2][3]的了解,flink动态加载 FileSystemFactory 运行时(通过 java.util.ServiceLoader )还加载 HadoopFsFactory 在运行时(通过反射,如果它在类路径中找到hadoop),然后使用它来创建 FileSystem .
我面临的问题是 RecoverableWriter for hadoop兼容包只支持 hdfs 文件方案(我使用 gs )因此在运行时抛出一个错误。
所以,我 extended 这个 HadoopFileSystem (我打电话来了 GCSFileSystem )以及 @overrided 这个 FileSystem#createRecoverableWriter() 返回的自定义实现 RecoverableWriter 然后处理恢复等细节,并创建相应的 FileSystemFactory 班级(班级装饰有 @AutoService 因此应该被 ServiceLoader ).
该设置在本地和本地docker集群上运行良好(实际上,gcs连接器由于缺乏授权而抛出错误,但这很好,因为这意味着 FileSystem 但当我将它部署到运行在google计算引擎上的docker集群时,它失败了。
在gce上,默认 HadoopFileSystem 加载并在执行方案时引发异常 gs 而不是 hdfs ,但我的假设是它应该加载我的工厂实现,因此不应该出现此错误。
我在flinkv1.6.0上,使用dockerflink在docker上作为长时间运行的会话集群运行

ohtdti5x

ohtdti5x1#

答案在最后一行!!
我正在运行一个长期的生活会话集群,当我的 job.jar 被执行死刑 FileSystem 初始化已经完成,工厂已经加载!所以,当我添加作业时,没有进行初始化调用。
解决方案是什么?有几种方法取决于您如何部署作业:
独立:添加包含 FileSystem 执行 lib/ 目录
群集( manual ):添加包含 FileSystem 执行 lib/ 您的目录 zip 或者像什么的。
群集( docker )( long-living ):创建自定义容器映像并将jar添加到 lib/ 该图像的目录。
群集( docker )( per-job-session ):创建自定义容器映像并添加所有jar(包含 FileSystem 和你的工作等)的 lib/ 目录,请在此处阅读有关每个作业会话的详细信息。

相关问题