我正在使用 Flink 1.4.1
处理事务事件和hdf以存储检查点信息以实现容错。
创建了一个作业来聚合有关客户机、星期几和一天中的小时数的信息,从而创建了一个配置文件,如下面的代码所示。
val stream = env.addSource(consumer)
val result = stream
.map(openTransaction => {
val transactionDate = openTransaction.get("transactionDate")
val date = if (transactionDate.isTextual)
LocalDateTime.parse(transactionDate.asText, DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
else
transactionDate.asLong
(openTransaction.get("clientId").asLong, openTransaction.get("amount").asDouble, new Timestamp(date))
})
.keyBy(0)
.window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
.sum(1)
在上面的代码中,流有三个字段:“transactiondate”、“clientid”和“amount”。我们通过clientid生成一个键控流,并生成一个滑动窗口来对数量求和。我们的数据库中有大约100000个独特的活动客户机。
运行一段时间后,作业使用的总ram稳定在36 gb,但hdfs中存储的检查点仅使用3 gb。有没有办法减少作业的ram使用量,可以通过配置flink的复制因子或者使用rocksdb?
1条答案
按热度按时间cvxl0en21#
对于这种状态大小,使用rocksdb绝对是您应该考虑的事情,并且根据使用模式的不同,当它通过只复制新的或更新的sst来增量执行它时,可以有更小的检查点。
要知道的一些事情,请记住:
每个有状态操作符并行子任务都有自己的rocksdb示例。
如果切换到rocksdb进行检查点设置,并且它开始运行得比您需要的慢,请确保您使用的序列化尽可能有效。
flink提供了一些基于备份文件系统的预定义选项,请确保选择适当
如果预定义的选项不适用于您,您可以覆盖rocksdb后端的optionsfactory并微调各个rocksdb选项
另一件事要注意的是,在flink与键控时间窗口内存使用是“计时器”可以使用大量的内存,如果你进入数十万或数百万。flink计时器是基于堆的(在本文撰写时),并且独立于状态后端进行同步检查点。