apache flink检查点卡住

vxf3dgd4  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(478)

我们正在运行一个liststate在300gb到400gb之间的作业,有时列表会增长到几千个。在我们的用例中,每个项都必须有自己的ttl,因此我们使用rocksdb后端在s3上为这个liststate的每个新项创建一个新计时器。
目前大约有1.4亿多个计时器(将在event.timestamp+40天触发)。
我们的问题是,作业的检查点突然被卡住,或者非常慢(比如几个小时内1%),直到它最终超时。通常会停止(flink Jmeter 盘显示 0/12 (0%) 前面的几行显示 12/12 (100%) )在一段非常简单的代码上:

[...]
    val myStream = env.addSource(someKafkaConsumer)
      .rebalance
      .map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
      .uid("src_kafka_stream")
      .name("some_name")

      myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
        .getSideOutput(outputTag)
        .keyBy(_.name)
        .addSink(sink)
[...]

更多信息:
至少一次检查点模式似乎比一次更容易卡住
几个月前,这个州的数据容量达到了1.5tb,我认为数十亿的计时器没有任何问题。
运行两个任务管理器的机器上的ram、cpu和网络看起来都很正常 state.backend.rocksdb.thread.num = 4 第一个事件发生在我们收到大量事件(大约几百万分钟)的时候,而不是上一个事件。
所有的事件都来自Kafka的主题。
在至少一次检查点模式下,作业仍然正常运行和使用。
这是第二次发生在我们身上,拓扑运行得非常好,每天有几百万个事件,突然停止检查点。我们不知道是什么原因造成的。
任何人都能想到什么会突然导致检查站卡住?

x3naxklr

x3naxklr1#

一些想法:
如果有多个计时器或多或少同时触发,那么这场计时器风暴将阻止其他任何事情的发生——任务将循环调用计时器,直到没有更多的计时器要触发为止,在此期间,它们的输入队列将被忽略,检查点障碍将不会继续。
如果这是你的麻烦的原因,你可能会添加一些随机抖动到你的计时器,使事件风暴不会变成风暴以后计时器。重新组织使用状态ttl可能是另一种选择。
如果堆上有很多计时器,这会导致非常高的gc开销。这不一定会使工作失败,但会使检查点不稳定。在这种情况下,将计时器移到rocksdb中可能会有所帮助。
另外:由于您使用的是rocksdb,因此以时间为键从liststate切换到mapstate,可以删除单个条目,而无需在每次更新后重新序列化整个列表(对于rocksdb,mapstate中的每个键/值对都是一个单独的rocksdb对象。)通过这种方式提高清理效率可能是最好的补救方法。

相关问题