apache flink-x分钟内未收到数据时发送事件

p5fdfcr1  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(521)

我如何用flink的datastreamapi实现一个操作符,当一段时间内没有从流接收到数据时,该操作符会发送一个事件?

zaqlnxep

zaqlnxep1#

你可以用一个自定义的触发函数设置一个时间窗口。在trigger函数中,每次接收到事件时,“onevent”方法都会将processingtimetrigger设置为“currenttime+desiredtimedelay”。然后,当一个新事件出现时,删除先前设置的触发器并创建一个新的触发器。如果一个事件在系统时间是processingtimetrigger上的时间之前没有出现,它将触发,并且窗口将被处理。即使没有事件发生,要处理的事件列表也将是空的。

x3naxklr

x3naxklr2#

可以使用 ProcessFunction .

DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L);

input
  // use keyBy to have keyed state. 
  // NullByteKeySelector will move all data to one task. You can also use other keys
  .keyBy(new NullByteKeySelector())
  // use process function with 60 seconds timeout
  .process(new TimeOutFunction(60 * 1000));

这个 TimeOutFunction 定义如下。在本例中,它使用处理时间。

public static class TimeOutFunction extends ProcessFunction<Long, Boolean> {

  // delay after which an alert flag is thrown
  private final long timeOut;
  // state to remember the last timer set
  private transient ValueState<Long> lastTimer;

  public TimeOutFunction(long timeOut) {
    this.timeOut = timeOut;
  }

  @Override
  public void open(Configuration conf) {
    // setup timer state
    ValueStateDescriptor<Long> lastTimerDesc = 
      new ValueStateDescriptor<Long>("lastTimer", Long.class);
    lastTimer = getRuntimeContext().getState(lastTimerDesc);
  }

  @Override
  public void processElement(Long value, Context ctx, Collector<Boolean> out) throws Exception {
    // get current time and compute timeout time
    long currentTime = ctx.timerService().currentProcessingTime();
    long timeoutTime = currentTime + timeOut;
    // register timer for timeout time
    ctx.timerService().registerProcessingTimeTimer(timeoutTime);
    // remember timeout time
    lastTimer.update(timeoutTime);
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Boolean> out) throws Exception {
    // check if this was the last timer we registered
    if (timestamp == lastTimer.value()) {
      // it was, so no data was received afterwards.
      // fire an alert.
      out.collect(true);
    }
  }
}

相关问题