内存消耗总是在增加

myzjeezk  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(280)

我当前的flink应用程序在3个节点上运行48个任务槽。我还使用rocksdb作为状态管理(我不关心flink中的保存点和检查点机制,我只是创建了一个状态,几乎5分钟(ttl)
但是,所有节点的内存消耗总是在增加,我必须通过stop-cluster.sh停止flink应用程序,然后重新启动。
我有许多基于客户端ip地址的keyedstreams。每天都有数百万用户访问我的网站。
一些键控流正在使用 StateTtlConfig 当其他人使用 onTimer 机制。
我对内存消耗(或泄漏)的假设是:调用registerprocessingtimer会创建一个保存在内存中的条目,因为有很多ip地址,所以我会有很多条目,而且内存消耗总是在增加?
我是否应该删除ontimer解决方案并仅使用statettlconfig(我使用ontimer方法是因为在statettlconfig中,每次我更新state时,它也会更新ttls,从而在我的应用程序中创建无效数据)
国家管理范例

  1. // EXAMPLE FOR STATETTLCONFIG
  2. public class State1 extends KeyedProcessFunction<Tuple, ..., ...>{
  3. private transient ValueState<Integer> state;
  4. @Override
  5. public void open(Configuration parameters) throws Exception{
  6. StateTtlConfig ttlConfig = StateTtlConfig
  7. .newBuilder(Time.minutes(2))
  8. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  9. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  10. .cleanupInBackground()
  11. .build();
  12. ValueStateDescriptor<Integer> valueStateDesc = new ValueStateDescriptor<Integer>(
  13. ..
  14. );
  15. valueStateDesc.enableTimeToLive(ttlConfig);
  16. state = getRuntimeContext().getState(valueStateDesc);
  17. }
  18. @Override
  19. public void processElement(LogObject value, Context ctx, Collector<LogObject> out) throws Exception{
  20. Integer stateVal = valueState.value();
  21. // do something and update state
  22. }
  23. }
  24. // EXAMPLE FOR ONTIMER METHOD
  25. public class State2 extends KeyedProcessFunction<Tuple, ..., ...> {
  26. private transient ValueState<Integer> state;
  27. @Override
  28. public void open(Configuration parameters){
  29. ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>(
  30. ...;
  31. state = getRuntimeContext().getState(stateDesc);
  32. }
  33. @Override
  34. public void processElement(LogObject value, Context ctx, Collector<LogObject> out) throws Exception{
  35. Integer stateVal = state.value();
  36. if (stateVal == null)
  37. {
  38. stateVal = 0;
  39. ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 5 MINS);
  40. }
  41. stateVal ++;
  42. // do something and update state
  43. }
  44. @Override
  45. public void onTimer(long timestamp, OnTimerContext ctx, Collector<LogObject> out)
  46. {
  47. stateVal.clear();
  48. }
  49. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题