我们的flink作业包含一个过滤器,按会话id键,然后是间隔30分钟的会话窗口。会话窗口需要累积会话的所有事件,并使用 ProcessWindowFunction
.
我们使用flink 1.9128个容器,总共有20g内存来运行我们的作业,截止比是0.3。我们正在进行增量检查点。
当会话窗口开始触发时 process
功能,网络缓冲区使用率开始变得相当高,然后我们开始得到Kafka输入滞后。我们的环境:
state.backend: rocksdb
state.checkpoints.dir: hdfs://nameservice0/service
state.backend.rocksdb.memory.managed: true
state.backend.incremental: true
# https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
state.backend.rocksdb.memory.write-buffer-ratio: 0.6
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
state.backend.rocksdb.block.blocksize: 16mb
state.backend.rocksdb.writebuffer.count: 8
state.backend.rocksdb.writebuffer.size: 256mb
state.backend.rocksdb.timer-service.factory: heap
containerized.heap-cutoff-ratio: 0.25
taskmanager.network.memory.fraction: 0.85
taskmanager.network.memory.min: 512mb
taskmanager.network.memory.max: 7168mb
taskmanager.network.memory.buffers-per-channel: 8
taskmanager.memory.segment-size: 4mb
taskmanager.network.memory.floating-buffers-per-gate: 16
taskmanager.network.netty.transport: poll
一些图表:
任何建议都将不胜感激!
2条答案
按热度按时间dba5bblo1#
如果我可以访问详细信息,下面是我将查看的内容,以尝试提高此应用程序的性能:
(1) 可以重新实现windows来进行增量聚合吗?目前,windows正在建立一个可能相当长的事件列表,它们只在会话结束时处理这些列表。这显然需要足够长的时间来引起Kafka的背压。如果您可以预先聚合会话结果,这将使处理过程变得均匀,问题应该会消失。
不,我没有反驳我在这里说的话。如果我不清楚,就告诉我。
(2) 你已经设置了很多额外的网络缓冲。这通常适得其反;您希望背压能够快速反弹并限制源,而不是将更多数据推入flink的网络缓冲区。
您最好减少网络缓冲,如果可能的话,使用可用资源来提供更多的插槽。当一个插槽忙于处理刚刚结束的长会话的内容时,拥有更多的插槽将减少影响。给rocksdb更多的内存也会有所帮助。
(3) 看看是否可以优化序列化。最好的序列化程序和最差的序列化程序的吞吐量可能相差10倍。请参见flink序列化调整。如果记录中有不需要的字段,请删除它们。
(4) 看看调谐摇滚SDB。确保您使用的是rocksdb可用的最快本地磁盘,如本地SSD。避免将网络连接存储(如ebs)用于
state.backend.rocksdb.localdir
.nom7f22z2#
我不知道flink的内部结构,但原因可能与会话窗口有关。我的意思是,如果有这么多会话操作具有相同的间隔(30分钟),那么所有会话操作都将在同一时间执行,这会造成延迟。