我当前的flink应用程序在3个节点上运行48个任务槽。我还使用rocksdb作为状态管理(我不关心flink中的保存点和检查点机制,我只是创建了一个状态,几乎5分钟(ttl)
但是,所有节点的内存消耗总是在增加,我必须通过stop-cluster.sh停止flink应用程序,然后重新启动。
我有许多基于客户端ip地址的keyedstreams。每天都有数百万用户访问我的网站。
一些键控流正在使用 StateTtlConfig
当其他人使用 onTimer
机制。
我对内存消耗(或泄漏)的假设是:调用registerprocessingtimer会创建一个保存在内存中的条目,因为有很多ip地址,所以我会有很多条目,而且内存消耗总是在增加?
我是否应该删除ontimer解决方案并仅使用statettlconfig(我使用ontimer方法是因为在statettlconfig中,每次我更新state时,它也会更新ttls,从而在我的应用程序中创建无效数据)
国家管理范例
// EXAMPLE FOR STATETTLCONFIG
public class State1 extends KeyedProcessFunction<Tuple, ..., ...>{
private transient ValueState<Integer> state;
@Override
public void open(Configuration parameters) throws Exception{
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(2))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInBackground()
.build();
ValueStateDescriptor<Integer> valueStateDesc = new ValueStateDescriptor<Integer>(
..
);
valueStateDesc.enableTimeToLive(ttlConfig);
state = getRuntimeContext().getState(valueStateDesc);
}
@Override
public void processElement(LogObject value, Context ctx, Collector<LogObject> out) throws Exception{
Integer stateVal = valueState.value();
// do something and update state
}
}
// EXAMPLE FOR ONTIMER METHOD
public class State2 extends KeyedProcessFunction<Tuple, ..., ...> {
private transient ValueState<Integer> state;
@Override
public void open(Configuration parameters){
ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>(
...;
state = getRuntimeContext().getState(stateDesc);
}
@Override
public void processElement(LogObject value, Context ctx, Collector<LogObject> out) throws Exception{
Integer stateVal = state.value();
if (stateVal == null)
{
stateVal = 0;
ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 5 MINS);
}
stateVal ++;
// do something and update state
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<LogObject> out)
{
stateVal.clear();
}
}
暂无答案!
目前还没有任何答案,快来回答吧!