我在调查一个flink工作的性能,它将数据从Kafka传输到s3接收器。我们正在用bucketingsink写Parquet文件。bucketing逻辑将具有文件夹的消息按数据类型、租户(客户)、日期时间、提取id等进行划分。这将导致每个文件存储在由9-10层组成的文件夹结构中(s3\ U bucket:/1/2/3/4/5/6/7/8/9/myfile…)
如果数据以租户类型的突发消息的形式分发,我们在编写时会看到良好的性能,但是当数据更多地以白噪声的形式分布在数千个租户、几十个数据类型和多个提取id上时,我们会有令人难以置信的性能损失(以300倍的顺序)
附加一个调试器,这个问题似乎与s3上同时打开的写入数据的处理程序的数量有关。更具体地说:
在研究用于写入s3的hadoop库时,我发现了一些可能的改进:
<name>fs.s3a.connection.maximum</name>
<name>fs.s3a.threads.max</name>
<name>fs.s3a.threads.core</name>
<name>fs.s3a.max.total.tasks</name>
但这些都没有对吞吐量产生很大影响。我还尝试将文件夹结构扁平化,以写入单个键(1\u 2\u 3…),但这也没有带来任何改进。
注意:这些测试是在Flink1.8上使用hadoop文件系统(bucketingsink)完成的,使用hadoop fs库2.6.x写入s3(就像我们使用ClouderaCDH5.x作为保存点一样),所以我们不能切换到streamingfilesink。
1条答案
按热度按时间8zzbczxx1#
在科斯塔斯的建议下https://lists.apache.org/thread.html/50ef4d26a1af408df8d9abb70589699cb6b26b2600ab6f4464e86ea4%40%3cdev.flink.apache.org%3e
减速的罪魁祸首是这段代码:https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/bucketingsink.java#l543-l551型
仅此一项就需要4-5秒,打开文件总共需要6秒。检测调用的日志:
这与bucketing接收器的默认设置(60秒不活动滚动)一起使用https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/bucketingsink.java#l195 意味着当我们完成任务时,一个槽上有超过10个平行桶创建最后一个bucket第一个bucket就过时了,所以需要旋转生成阻塞情况。
我们通过替换
BucketingSink.java
以及删除上述fs检查:正如我们所看到的,没有它,Flume可以正常工作,现在打开文件需要1.2秒。
此外,我们将默认的非活动阈值设置为5分钟。通过这些更改,我们可以轻松地处理每个插槽200多个存储桶(一旦作业提高速度,它将在所有插槽上接收,因此推迟非活动超时)