如何使用flink cep创建批处理或幻灯片窗口?

ggazkfy8  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(305)

我只是从 Flink CEP 我来自 Esper CEP 引擎。你可能(或不)知道,在 Esper 使用它们的语法( EPL )您可以创建 batch 或者 slide 窗口,将这些窗口中的事件分组,并允许您将这些事件与函数(avg、max、min…)一起使用。
例如,使用以下模式可以创建5秒的批处理窗口,并计算属性的平均值 price 在所有的 Stock 在指定窗口中接收到的事件。

select avg(price) from Stock#time_batch(5 sec)

问题是我想知道如何实现这一点 Flink CEP . 我知道,也许 Flink CEP 是不同的,所以实现这一点的方法可能不像 Esper CEP .
我已经看了一些关于时间窗口的文档,但是我不能同时实现这个窗口 Flink CEP . 因此,给定以下代码:

DataStream<Stock> stream = ...; // Consume events from Kafka

// Filtering events with negative price
Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start")
            .where(
                    new SimpleCondition<Stock>() {
                        public boolean filter(Stock event) {
                            return event.getPrice() >= 0;
                        }
                    }
            );

PatternStream<Stock> patternStream = CEP.pattern(stream, pattern);

/**
  CREATE A BATCH WINDOW OF 5 SECONDS IN WHICH
  I COMPUTE OVER THE AVERAGE PRICES AND, IF IT IS
  GREATER THAN A THREESHOLD, AN ALERT IS DETECTED

  return avg(allEventsInWindow.getPrice()) > 1;

* /

DataStream<Alert> result = patternStream.select(
            new PatternSelectFunction<Stock, Alert>() {
                @Override
                public Alert select(Map<String, List<Stock>> pattern) throws Exception {
                    return new Alert(pattern.toString());
                }
            }
    );

如何创建一个窗口,从收到的第一个窗口开始,在5秒内计算以下事件的平均值。例如:

t = 0 seconds 
Stock(price = 1); (...starting batch window...)
Stock(price = 1);
Stock(price = 1);
Stock(price = 2);
Stock(price = 2);
Stock(price = 2);
t = 5 seconds     (...end of batch window...)
Avg = 1.5 => Alert detected!

5秒后的平均值为1.5,将触发警报。我该如何编写代码?
谢谢!

7lrncoxx

7lrncoxx1#

在flink的cep库中,这种行为是无法表达的。我宁愿推荐用Flink的 DataStream 或表api来计算平均值。基于此,您可以再次使用cep生成其他事件。

final DataStream<Stock> input = env
    .fromElements(
            new Stock(1L, 1.0),
            new Stock(2L, 2.0),
            new Stock(3L, 1.0),
            new Stock(4L, 2.0))
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Stock>(Time.seconds(0L)) {
        @Override
        public long extractTimestamp(Stock element) {
            return element.getTimestamp();
        }
    });

final DataStream<Double> windowAggregation = input
    .timeWindowAll(Time.milliseconds(2))
    .aggregate(new AggregateFunction<Stock, Tuple2<Integer, Double>, Double>() {
        @Override
        public Tuple2<Integer, Double> createAccumulator() {
            return Tuple2.of(0, 0.0);
        }

        @Override
        public Tuple2<Integer, Double> add(Stock value, Tuple2<Integer, Double> accumulator) {
            return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value.getValue());
        }

        @Override
        public Double getResult(Tuple2<Integer, Double> accumulator) {
            return accumulator.f1 / accumulator.f0;
        }

        @Override
        public Tuple2<Integer, Double> merge(Tuple2<Integer, Double> a, Tuple2<Integer, Double> b) {
            return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
        }
    });

final DataStream<Double> result = windowAggregation.filter((FilterFunction<Double>) value -> value > THRESHOLD);

相关问题