我有一个flink应用程序,在aws emr中运行,具有高并行度(400)。它使用bucketingsink(使用rocksdb后端进行检查点)将kafka发送到s3。使用“s3a://”前缀定义目的地。flink作业是一个连续运行的流应用程序。在任何给定的时间,所有的worker组合起来都有可能生成/写入400个文件(由于400个并行性)。几天后,其中一名工人将失败,但有一个例外:
org.apache.hadoop.fs.s3a.AWSS3IOException: copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, bucket/2018-09-01/05/_file-10-1.gz.pending): com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 178)
at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 1803)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
当bucketingsink创建新零件文件时,这似乎是随机发生的。奇怪的是,这是随机发生的,当它发生时,它发生在一个平行的flink工人(不是全部)身上。此外,当发生这种情况时,flink作业将转换为失败状态,但flink作业不会重新启动,也不会从上一个成功的检查点恢复/恢复。造成这种情况的原因是什么?应该如何解决?此外,如何将作业配置为从最后一个成功的检查点重新启动/恢复,而不是保持失败状态?
1条答案
按热度按时间jgwigjjp1#
我认为这是bucketing sink和s3的已知行为,建议的解决方案是在flink1.7.0中使用闪亮的新streamingfilesink。
基本上,bucketing sink期望写入和重命名像在真实的文件系统中一样立即发生,但对于s3这样的对象存储来说,这不是一个好的假设,因此bucketing sink最终会出现导致间歇性问题的竞争条件。这是一张描述问题的jira罚单,相关罚单更充实了一点。吉拉·Flink-9752