内存消耗总是在增加

myzjeezk  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(242)

我当前的flink应用程序在3个节点上运行48个任务槽。我还使用rocksdb作为状态管理(我不关心flink中的保存点和检查点机制,我只是创建了一个状态,几乎5分钟(ttl)
但是,所有节点的内存消耗总是在增加,我必须通过stop-cluster.sh停止flink应用程序,然后重新启动。
我有许多基于客户端ip地址的keyedstreams。每天都有数百万用户访问我的网站。
一些键控流正在使用 StateTtlConfig 当其他人使用 onTimer 机制。
我对内存消耗(或泄漏)的假设是:调用registerprocessingtimer会创建一个保存在内存中的条目,因为有很多ip地址,所以我会有很多条目,而且内存消耗总是在增加?
我是否应该删除ontimer解决方案并仅使用statettlconfig(我使用ontimer方法是因为在statettlconfig中,每次我更新state时,它也会更新ttls,从而在我的应用程序中创建无效数据)
国家管理范例

// EXAMPLE FOR STATETTLCONFIG
public class State1 extends KeyedProcessFunction<Tuple, ..., ...>{

    private transient ValueState<Integer> state;

    @Override
    public void open(Configuration parameters) throws Exception{

        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(2))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .cleanupInBackground()
                .build();

        ValueStateDescriptor<Integer> valueStateDesc = new ValueStateDescriptor<Integer>(
               ..
        );
        valueStateDesc.enableTimeToLive(ttlConfig);

        state = getRuntimeContext().getState(valueStateDesc);
    }

    @Override
    public void processElement(LogObject value, Context ctx, Collector<LogObject> out) throws Exception{
        Integer stateVal = valueState.value();
        // do something and update state
    }
}

// EXAMPLE FOR ONTIMER METHOD

public class State2 extends KeyedProcessFunction<Tuple, ..., ...> {

    private transient ValueState<Integer> state;

    @Override
    public void open(Configuration parameters){

        ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>(
                ...;

        state = getRuntimeContext().getState(stateDesc);
    }

    @Override
    public void processElement(LogObject value, Context ctx, Collector<LogObject> out) throws Exception{
        Integer stateVal = state.value();

        if (stateVal == null)
        {
            stateVal = 0;
            ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 5 MINS);
        }
        stateVal ++;
        // do something and update state
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<LogObject> out)
    {
        stateVal.clear();
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题