我只是从 Flink CEP
我来自 Esper CEP
引擎。你可能(或不)知道,在 Esper
使用它们的语法( EPL
)您可以创建 batch
或者 slide
窗口,将这些窗口中的事件分组,并允许您将这些事件与函数(avg、max、min…)一起使用。
例如,使用以下模式可以创建5秒的批处理窗口,并计算属性的平均值 price
在所有的 Stock
在指定窗口中接收到的事件。
select avg(price) from Stock#time_batch(5 sec)
问题是我想知道如何实现这一点 Flink CEP
. 我知道,也许 Flink CEP
是不同的,所以实现这一点的方法可能不像 Esper CEP
.
我已经看了一些关于时间窗口的文档,但是我不能同时实现这个窗口 Flink CEP
. 因此,给定以下代码:
DataStream<Stock> stream = ...; // Consume events from Kafka
// Filtering events with negative price
Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start")
.where(
new SimpleCondition<Stock>() {
public boolean filter(Stock event) {
return event.getPrice() >= 0;
}
}
);
PatternStream<Stock> patternStream = CEP.pattern(stream, pattern);
/**
CREATE A BATCH WINDOW OF 5 SECONDS IN WHICH
I COMPUTE OVER THE AVERAGE PRICES AND, IF IT IS
GREATER THAN A THREESHOLD, AN ALERT IS DETECTED
return avg(allEventsInWindow.getPrice()) > 1;
* /
DataStream<Alert> result = patternStream.select(
new PatternSelectFunction<Stock, Alert>() {
@Override
public Alert select(Map<String, List<Stock>> pattern) throws Exception {
return new Alert(pattern.toString());
}
}
);
如何创建一个窗口,从收到的第一个窗口开始,在5秒内计算以下事件的平均值。例如:
t = 0 seconds
Stock(price = 1); (...starting batch window...)
Stock(price = 1);
Stock(price = 1);
Stock(price = 2);
Stock(price = 2);
Stock(price = 2);
t = 5 seconds (...end of batch window...)
Avg = 1.5 => Alert detected!
5秒后的平均值为1.5,将触发警报。我该如何编写代码?
谢谢!
1条答案
按热度按时间7lrncoxx1#
在flink的cep库中,这种行为是无法表达的。我宁愿推荐用Flink的
DataStream
或表api来计算平均值。基于此,您可以再次使用cep生成其他事件。