ApacheFlinkCEP如何基于事件值传递时间窗口?

y3bcpkx1  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(289)
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .next("middle").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("error");
        }
    }).followedBy("end").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("critical");
        }
    }).within(Time.seconds(10));

有什么办法我可以代替你吗 Time.seconds(10)value.getSomeTimeField() 我穿过 Event ?

mm9b1k5b

mm9b1k5b1#

我猜你想以活动时间的方式工作。有关它的更多信息,您可以查看这个文档和这个关于如何从元素中提取时间戳的部分。
在你的例子中,你可以这样做:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> input = ...

input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {

    @Override
    public long extractAscendingTimestamp(MyEvent element) {
        return value.getSomeTimeField();
    }
})

CEP.pattern(input, pattern).select(...)

这样,事件将在流中自动排序,超时将应用于这两种情况下的时间字段。

相关问题