允许全局窗口自定义触发器的延迟

ulydmbyx  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(461)

我已经为我的事件流创建了一个自定义触发器和处理函数。

DataStream<DynamoDBRow> dynamoDBRows =
    sensorEvents
        .keyBy("id")
        .window(GlobalWindows.create())
        .trigger(new MyCustomTrigger())
        .allowedLateness(Time.minutes(1)) # Note
        .process(new MyCustomWindowProcessFunction());

我的触发器基于事件参数。一旦接收到事件结束信号,mycustomwindowprocessfunction()将应用于窗口元素。

@Slf4j
public class MyCustomTrigger extends Trigger<SensorEvent, GlobalWindow> {

  @Override
  public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

    if (element.isEventEnd() == true) {
      return TriggerResult.FIRE_AND_PURGE;
    }

    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}

传感器数据可能很少,甚至在触发后也可能出现。所以我补充说 .allowedLateness(Time.minutes(1)) ,以确保在处理时不会遗漏这些事件。
在我看来,允许迟到是行不通的。
翻阅文件后,我发现了这个

如何在全局窗口中包含allowedlateness?
注:我还尝试设置环境时间特性

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

更新:20-02-2020
目前正在考虑以下方法(到目前为止没有工作)

@Slf4j
public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {

  private final long allowedLatenessMillis;

  public JourneyTrigger(Time allowedLateness) {
    this.allowedLatenessMillis = allowedLateness.toMilliseconds();
  }

  @Override
  public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

    if (element.isEventEnd() == true) {
      log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
      ctx.registerEventTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
    }

    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    log.info("onEvenTime called at "+System.currentTimeMillis() );
    return TriggerResult.FIRE_AND_PURGE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}
rekjcdws

rekjcdws1#

最后,我使用下面的自定义触发器实现了我的要求。

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

@Slf4j
public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {

  private final long allowedLatenessMillis;

  public JourneyTrigger(Time allowedLateness) {
    this.allowedLatenessMillis = allowedLateness.toMilliseconds();
  }

  @Override
  public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

    if (element.isEventEnd()==true) {
      log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
      ctx.registerProcessingTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
    }

    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    log.info("onProcessingTime called at "+System.currentTimeMillis() );
    return TriggerResult.FIRE_AND_PURGE;
  }

  @Override
  public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}

同样在 Driver.java 类,设置环境时间特征

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
hivapdat

hivapdat2#

老实说,我不认为有理由使用 GlobalWindow 在这里。你可以用 KeyedProcessFunction 这和你的计划是一样的 Trigger ,基本上,它将从事件开始到事件结束的所有元素聚集到 ListState 当你收到 isEventEnd()==true ,您可以简单地安排 EventTime 一分钟后启动的计时器,会发出内部收集的结果 ListState .

相关问题