flinkhadoop在多个并行桶中的性能

tcbh2hod  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(549)

我在调查一个flink工作的性能,它将数据从Kafka传输到s3接收器。我们正在用bucketingsink写Parquet文件。bucketing逻辑将具有文件夹的消息按数据类型、租户(客户)、日期时间、提取id等进行划分。这将导致每个文件存储在由9-10层组成的文件夹结构中(s3\ U bucket:/1/2/3/4/5/6/7/8/9/myfile…)
如果数据以租户类型的突发消息的形式分发,我们在编写时会看到良好的性能,但是当数据更多地以白噪声的形式分布在数千个租户、几十个数据类型和多个提取id上时,我们会有令人难以置信的性能损失(以300倍的顺序)
附加一个调试器,这个问题似乎与s3上同时打开的写入数据的处理程序的数量有关。更具体地说:

在研究用于写入s3的hadoop库时,我发现了一些可能的改进:

<name>fs.s3a.connection.maximum</name>
      <name>fs.s3a.threads.max</name>
      <name>fs.s3a.threads.core</name>
      <name>fs.s3a.max.total.tasks</name>

但这些都没有对吞吐量产生很大影响。我还尝试将文件夹结构扁平化,以写入单个键(1\u 2\u 3…),但这也没有带来任何改进。
注意:这些测试是在Flink1.8上使用hadoop文件系统(bucketingsink)完成的,使用hadoop fs库2.6.x写入s3(就像我们使用ClouderaCDH5.x作为保存点一样),所以我们不能切换到streamingfilesink。

8zzbczxx

8zzbczxx1#

在科斯塔斯的建议下https://lists.apache.org/thread.html/50ef4d26a1af408df8d9abb70589699cb6b26b2600ab6f4464e86ea4%40%3cdev.flink.apache.org%3e
减速的罪魁祸首是这段代码:https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/bucketingsink.java#l543-l551型
仅此一项就需要4-5秒,打开文件总共需要6秒。检测调用的日志:

2020-02-07 08:51:05,825 INFO  BucketingSink  - openNewPartFile FS verification
2020-02-07 08:51:09,906 INFO  BucketingSink  - openNewPartFile FS verification - done
2020-02-07 08:51:11,181 INFO  BucketingSink  - openNewPartFile FS - completed partPath = s3a://....

这与bucketing接收器的默认设置(60秒不活动滚动)一起使用https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/bucketingsink.java#l195 意味着当我们完成任务时,一个槽上有超过10个平行桶创建最后一个bucket第一个bucket就过时了,所以需要旋转生成阻塞情况。
我们通过替换 BucketingSink.java 以及删除上述fs检查:

LOG.debug("Opening new part file FS verification");
        if (!fs.exists(bucketPath)) {
            try {
                if (fs.mkdirs(bucketPath)) {
                    LOG.debug("Created new bucket directory: {}", bucketPath);
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Could not create new bucket path.", e);
            }
        }
        LOG.debug("Opening new part file FS verification - done");

正如我们所看到的,没有它,Flume可以正常工作,现在打开文件需要1.2秒。
此外,我们将默认的非活动阈值设置为5分钟。通过这些更改,我们可以轻松地处理每个插槽200多个存储桶(一旦作业提高速度,它将在所有插槽上接收,因此推迟非活动超时)

相关问题