我有以下用例。
有一台机器正在向Kafka发送事件流,Kafka正在接收这些事件流 CEP engine
当流数据满足条件时生成警告。
FlinkKafkaConsumer011<Event> kafkaSource = new FlinkKafkaConsumer011<Event>(kafkaInputTopic, new EventDeserializationSchema(), properties);
DataStream<Event> eventStream = env.addSource(kafkaSource);
事件pojo包含id、名称、时间和ip。
机器将向kafka发送大量数据,并且机器中有35个唯一的事件名称(如name1、name2。。。。。name35),我想检测每个事件名称组合的模式(比如name1 co occurred with name2,name1 co occurred with name3。。等等)。我一共有1225个组合。
规则pojo包含e1name和e2name。
List<Rule> ruleList -> It contains 1225 rules.
for (Rule rule : ruleList) {
Pattern<Event, ?> warningPattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
if(value.getName().equals(rule.getE1Name())) {
return true;
}
return false;
}
}).followedBy("next").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
if(value.getName().equals(rule.getE2Name())) {
return true;
}
return false;
}
}).within(Time.seconds(30));
PatternStream patternStream = CEP.pattern(eventStream, warningPattern);
}
这是在一个数据流上执行多个模式的正确方法还是有任何优化的方法来实现这一点。通过上述方法,我们得到 PartitionNotFoundException
以及 UnknownTaskExecutorException
还有记忆问题。
1条答案
按热度按时间lymgl2op1#
在我看来,实现目标不需要模式。您可以定义一个有状态Map函数到源,该函数将事件名称成对Map(最新的两个名称)。之后,将源代码窗口设置为30秒,并对源代码应用简单的wordcount示例。
有状态Map函数可以是这样的(只接受事件名称,您需要根据您的输入更改它-提取事件名称等):
事件名称对和事件计数作为元组的结果可以如下获得(写入kafka接收器?):