state垃圾收集在带有globalwindow的beam中

mmvthczy  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(271)

apachebeam最近通过 StateSpec 以及 @StateId 注解,部分支持apacheflink和googleclouddataflow。
我找不到任何文档说明当它与 GlobalWindow . 特别是,有没有一种方法可以使用“状态垃圾收集”机制,根据某些配置,除去一段时间内没有看到的密钥的状态,同时仍然为频繁看到的密钥保持一个单一的所有时间状态?
或者,在这种情况下使用的状态量是否会发生分歧,无法回收与一段时间内没有看到的密钥对应的状态?
我还对apacheflink或googleclouddataflow是否支持一个潜在的解决方案感兴趣。
flink和directrunners似乎有一些“stategc”的代码,但我不确定它的功能,以及在使用全局窗口时是否相关。

unftdfkk

unftdfkk1#

状态可以在窗口过期后的某个时间点由梁运行程序自动垃圾收集—当输入水印超过允许的延迟时间时,所有进一步的输入都是可丢弃的。具体细节取决于跑步者。
正如您正确确定的,全局窗口可能永远不会过期。这样就不会调用这个状态的自动集合。对于有界数据(包括外泄场景),它实际上会过期,但对于永久无界数据源,它不会过期。
如果在全局窗口中对此类数据进行有状态处理,则可以使用用户定义的计时器(通过 @TimerId , @OnTimer ,和 TimerSpec -我还没有写这些博客)在你选择的某个超时之后清除状态。如果状态表示某种类型的聚合,那么您无论如何都需要一个计时器来确保您的数据没有陷入状态。
下面是一个使用它们的快速示例:

new DoFn<Foo, Baz>() {

  private static final String MY_TIMER = "my-timer";
  private static final String MY_STATE = "my-state";

  @StateId(MY_STATE)
  private final StateSpec<ValueState<Bizzle>> =
      StateSpec.value(Bizzle.coder());

  @TimerId(MY_TIMER)
  private final TimerSpec myTimer =
      TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
      ProcessContext c,
      @StateId(MY_STATE) ValueState<Bizzle> bizzleState,
      @TimerId(MY_TIMER) Timer myTimer) {
    bizzleState.write(...);
    myTimer.setForNowPlus(...);
  }

  @OnTimer(MY_TIMER)
  public void onMyTimer(
      OnTimerContext context,
      @StateId(MY_STATE) ValueState<Bizzle> bizzleState) {
    context.output(... bizzleState.read() ...);
    bizzleState.clear();
  }
}
vsaztqbk

vsaztqbk2#

如果您使用 GlobalWindows . 只有在使用某些非全局窗口时,水印经过窗口的末尾加上允许的延迟后,状态才会被垃圾收集。
如果你必须和他一起工作,你能做什么 GlobalWindows 是手动保持 last update timestamp . 然后定期设置一个计时器,根据当前时间检查时间戳,必要时删除状态。您可以在第一次遇到密钥时设置这个计时器(从没有时间戳状态可以看出),然后在 @OnTimer 方法。

相关问题