我有一个Apache Flink应用程序,我已经部署在Kinesis数据分析。
此应用程序从Kafka读取数据并写入S3。它写入的S3存储桶结构是使用BucketAssigner计算的。BucketAssigner的精简版本here
我遇到的问题是,假设我们必须写入以下目录结构:PUT
在发出PUT
请求之前,它会发出以下HEAD
请求:
HEAD /folder1
HEAD /folder1/folder2
HEAD /folder1/folder2/folder3/
然后发出PUT
请求。
它对每个请求都这样做,这导致了S3速率限制,并在我的FLink应用程序中产生了反压力。
我发现有人在使用BucketingSink时遇到了类似的问题:https://lists.apache.org/thread/rbp2gdbxwdrk7zmvwhd2bw56mlwokpzz
那里提到的解决方案是切换到StreamingFileSink,这就是我正在做的。
如何在StreamingFileSink中修复此问题?
我的SinkConfig如下:
StreamingFileSink
.forRowFormat(new Path(s3Bucket), new JsonEncoder<>())
.withBucketAssigner(bucketAssigner)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(60000)
.build())
.build()
JsonEncoder获取该对象并将其转换为json,然后写出this之类的字节
我已经在这个问题中详细描述了整个渠道的工作方式,如果这对您有所帮助的话:Heavy back pressure and huge checkpoint size
暂无答案!
目前还没有任何答案,快来回答吧!