flink:处理背压(图片来源:kafka,sink:elasticsearch)

vlju58qv  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(579)

我有一份flink的工作,从kafka那里读取数据,执行某些聚合,并将结果写入elasticsearch索引。我看到源上有很大的反压力。高的反压力导致数据从kafka中缓慢读取,我看到数据在网络堆栈中排队(netstat recvq显示源kafka连接中卡住了数万字节的数据,数据最终被读取),这反过来又导致数据在延迟后被放入elasticsearch,并且延迟不断增加。
源每分钟产生约17500条记录,flink作业为每个传入记录分配(事件)时间戳,执行12种不同类型的keyby,在1分钟的滚动窗口中调整事件,在这个键控的windows流上执行聚合操作,最后将结果写入12个不同的elasticsearch索引(每次写入都是一个insert)。
问题是,写入elasticsearch的数据开始滞后,因此 Jmeter 板结果(构建在elasticsearch之上)不再是实时的。我的理解是,这是因为背压的建立。不知道如何解决这个问题。集群本身是一个基于vm的单节点独立集群,具有64gbram(taskmanager配置为使用20gb)和16个vcpu。没有证据表明cpu或内存受到限制(如htop所示)。只有一个任务管理器,这是这个集群上唯一的flink作业。
我不确定这个问题是由于集群中的一些本地资源问题还是由于对elasticsearch的写入速度太慢。我已经将setbulkflushmaxactions设置为1(正如我在任何地方看到的所有代码示例中所做的那样),是否还需要设置setbulkflushinterval和/或setbulkflushmaxsizeinmb?
我经历过https://www.da-platform.com/flink-forward-berlin/resources/improving-throughput-and-latency-with-flinks-network-stack 但尚未尝试幻灯片19中列出的调整选项,不确定要为这些参数设置什么值。
最后,我认为在intellijide中运行同一个作业时不会遇到同样的问题。
我将排除所有聚合,然后逐个将它们添加回去,以查看是否存在触发此问题的特定聚合?
任何具体的指针将不胜感激,还将尝试setbulkflushinterval和setbulkflushmaxsizeinmb。
2019年1月29日的更新1似乎两个节点都以非常高的堆使用率运行,因此gc一直在运行,试图清除jvm中的空间。将物理内存从16 GB增加到32gb,然后重新启动节点。希望能解决这个问题,再过24小时就知道了。

a64a0gku

a64a0gku1#

通常在这种情况下,问题出在与外部数据存储的连接上——要么带宽不足,要么对每条记录进行同步写入,而不是成批写入。
验证elasticsearch接收器是否存在问题(而不是网络堆栈配置)的一个简单方法是,将其替换为丢弃接收器(一个什么也不做的接收器),以查看这是否解决了问题。像这样的

public static class NullSink<OUT> implements SinkFunction<OUT> {
    @Override
    public void invoke(OUT value, Context context) throws Exception {
    }
}

更新:
问题是您已将bulk.flush.max.actions设置为1,从而阻止了与elasticsearch服务器的连接中的任何缓冲。

kkbh8khc

kkbh8khc2#

通过增加(加倍)elasticsearch集群节点上的ram并将索引刷新间隔(在所有elasticsearch索引上)设置为30s(默认值为1s),问题得到了解决。在做出这些改变后,flink的背压被报告为正常,没有数据滞后,一切看起来都很正常。

相关问题