我正在尝试将StreamingFileLink连接到azure blob存储。目前文档中没有提到azure,但我希望它能与文件系统抽象一起工作。
在分析错误之后,我假设这个特性现在不在azure blob存储的范围之内。
现在我想确保我没有犯任何错误,如果有办法让它工作的话,我会很感激你的指点。
到目前为止我发现:
这是我看到的例外:
java.lang.NoClassDefFoundError: org/apache/flink/fs/azure/common/hadoop/HadoopRecoverableWriter
at org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:129)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:117)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:288)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:402)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
现在似乎缺少hadooprecoverablewriter类方法
但是在flink azure fs hadoop存储库中,我可以在pom.xml中找到这个
<!-- shade Flink's Hadoop FS adapter classes -->
<relocation>
<pattern>org.apache.flink.runtime.fs.hdfs</pattern>
<shadedPattern>org.apache.flink.fs.azure.common.hadoop</shadedPattern>
</relocation>
阴影包包含了这个类。
现在我查看了flink-azure-fs-hadoop-1.10.0.jar和shaded包的内容,可恢复编写器不见了:
HadoopBlockLocation.class
HadoopDataInputStream.class
HadoopDataOutputStream.class
HadoopFileStatus.class
HadoopFileSystem.class
HadoopFsFactory.class
HadoopFsRecoverable.class
更多的挖掘显示pom.xml shade部分中实际上有一个过滤器,它排除了recoverablewriter。
<filter>
<artifact>org.apache.flink:flink-hadoop-fs</artifact>
<excludes>
<exclude>org/apache/flink/runtime/util/HadoopUtils</exclude>
<exclude>org/apache/flink/runtime/fs/hdfs/HadoopRecoverable*</exclude>
</excludes>
</filter>
暂无答案!
目前还没有任何答案,快来回答吧!