当我们使用reducingstate recordstore.add(r)时,发现性能波动如下所示,
无还原状态:稳定性能图
带还原状态:性能波动图
整体性能(下降超过100%!):不带reducingstate.add vs带reducingstate.add
它可以很容易地用一个简单的应用程序复制,没有检查点,只是简单地保存记录,还有简单的“求和”归约函数(实际上空函数也会看到相同的结果)。如有任何意见,我们将不胜感激。多么明显的问题啊。
基本上,应用程序只是将记录存储到状态中,我们测量“jsontranslator”中每秒有多少记录,如图所示。两者之间的区别只有一行,comment/un comment“recstore.add(r)”。
了解状态会影响性能,但这就是它的工作方式吗?
DataStream<String> stream = env.addSource(new GeneratorSource(loop);
DataStream<JSONObject> convert = stream.map(new JsonTranslator(statsdUrl))
.keyBy(new KeySelector<JSONObject, AggregationKey>() {... ...})
.process(new ProcessAggregation(aggrDuration, statsdUrl))
.map(new PassthruFunction(statsdUrl));
public class ProcessAggregation extends ProcessFunction<JSONObject, JSONObject> {
private ReducingState<JSONObject> recStore;
public void processElement(JSONObject r, Context ctx, Collector<JSONObject> out) {
recStore.add(r); //this line make the difference
}
2条答案
按热度按时间czfnxgou1#
我用你共享的代码做了一些实验。我只在笔记本电脑上运行过。我保留了所有的statsd代码,但我没有运行statsd。相反,我配置了
web.refresh-interval
至1秒并观察numRecordsOutPerSecond
在flink web Jmeter 板中。我唯一改变的就是修改GeneratorSource
连续运行以便观察稳态行为。这就是我所看到的:
除了在工作开始的时候,我没有看到吞吐量有任何剧烈的波动。有一个大约30秒的初始周期,在此期间吞吐量稳定地上升到一个值,然后保持相当一致(在初始启动阶段之后,它上下变化约10%,有或没有
ReducingState
).将flink版本从1.3.2升级到1.5.0,整体吞吐量提高了近2倍。这并不奇怪,因为自从1.3版以来,flink的网络栈已经做了很多工作。
评论
mergedRecordStore.add(r);
还将吞吐量提高了约2倍。看着代码,我看到了一件让人痛苦的事。您正在使用jsonobjects进行键控、序列化/反序列化和缩减。这个很贵。最好将json转换成pojo或tuple,这样使用起来会更便宜。
wfsdck302#
如果您的任务可以在一台只有少量线程的机器上轻松完成,那么
flink
如果执行托管状态对性能的影响太大,则可能会造成过度杀伤力。也就是说,你不需要直接使用
ReducingState
这样,您通常会使用aggregate
以及reduce
窗口运算符上的函数(还有,这里的窗口是什么?)输出结果时并不清楚。你是不是在不断地释放能量?您的源代码是否生成进入多个键的数据?
您使用的是默认状态后端还是rocksdb?
此外,您还可以考虑使用方便
sum
flink提供的函数,该函数允许您指定要添加的字段。