我在Flink的工作中使用Kafka的资料流,一次阅读50个主题,如下所示:
FlinkKafkaConsumer<GenericRecord> kafkaConsumer = new FlinkKafkaConsumer<GenericRecord>(
Pattern.compile("TOPIC_NAME[1-50].stream"), // getting data stream from all topics
<DeserializationSchema>, //using avro schema
properties); // auto.commit.interval.ms=1000 ...
还有一些操作符,比如:filter->map->keyby->window->aggreagate->sink
我能得到的最大吞吐量是每秒10k到20k条记录,考虑到源发布了数十万个事件,这是相当低的,我可以清楚地看到消费者落后于生产者。我甚至试着移除Flume和其他操作员,以确保没有背压,但它仍然是一样的。我正在将我的应用程序部署到amazon kinesis data analytics,并尝试了几种并行设置,但这些设置似乎都没有提高吞吐量。
我有什么遗漏吗?
1条答案
按热度按时间roejwanj1#
有几件事会显著影响吞吐量。
无效的序列化通常是导致吞吐量低的一个主要因素。请参阅flink serialization tuning vol.1:选择序列化程序-如果可以的话,以获取有关此主题的详细信息。avro通用记录序列化程序还不错,但是您是否携带了实际上不需要的数据?
您是否正在更改管道中的任何位置的并行度?那太贵了。
对于kinesis数据分析,您必须使用rocksdb状态后端,它的吞吐量比基于堆的状态后端少得多。但是拥有正确的配置会有很大的帮助。您应该为rocksdb工作目录使用可用的最快本地磁盘(ssd,或者在极端情况下,可能需要ram磁盘)。确保示例类型提供足够的iops。给rocksdb足够的内存。布鲁姆过滤器是值得启用的,如果你做了大量的阅读。请参阅flink中磁盘对rocksdb状态后端的影响:一个案例研究,以获得有关使用rocksdb的更多信息。
您可以尝试禁用检查点作为一个实验。如果有帮助的话,这会提供一些线索。
某些网络设置会影响吞吐量。默认值通常提供了不错的性能,但是如果您修改了它们,这是值得研究的。