我想找出接下来发生的事情的模式
内部模式为:
键“sensorarea”的值相同。
键“customerid”的值不同。
在5秒内。
这种模式需要
仅当前一个事件发生3次或以上时发出“警报”。
我写了一些东西,但我肯定它不完整。
两个问题
当我处于“下一个”模式时,我需要访问上一个事件字段,如果不使用ctx命令,怎么做呢,因为它很重。。
我的代码带来了奇怪的结果-这是我的输入
我的输出是
3> {first=[Customer[timestamp=50,customerId=111,toAdd=2,sensorData=33]], second=[Customer[timestamp=100,customerId=222,toAdd=2,sensorData=33], Customer[timestamp=600,customerId=333,toAdd=2,sensorData=33]]}
尽管我期望的输出应该是所有前六个事件(用户111/222和传感器分别是33、44和55)
Pattern<Customer, ?> sameUserDifferentSensor = Pattern.<Customer>begin("first", skipStrategy)
.followedBy("second").where(new IterativeCondition<Customer>() {
@Override
public boolean filter(Customer currCustomerEvent, Context<Customer> ctx) throws Exception {
List<Customer> firstPatternEvents = Lists.newArrayList(ctx.getEventsForPattern("first"));
int i = firstPatternEvents.size();
int currSensorData = currCustomerEvent.getSensorData();
int prevSensorData = firstPatternEvents.get(i-1).getSensorData();
int currCustomerId = currCustomerEvent.getCustomerId();
int prevCustomerId = firstPatternEvents.get(i-1).getCustomerId();
return currSensorData==prevSensorData && currCustomerId!=prevCustomerId;
}
})
.within(Time.seconds(5))
.timesOrMore(3);
PatternStream<Customer> sameUserDifferentSensorPatternStream = CEP.pattern(customerStream, sameUserDifferentSensor);
DataStream<String> alerts1 = sameUserDifferentSensorPatternStream.select((PatternSelectFunction<Customer, String>) Object::toString);
1条答案
按热度按时间mhd8tkvw1#
如果您首先按sensorarea键控流,您将有一个更轻松的时间。你将在流上进行模式匹配,其中所有的事件都是针对一个传感器区域的,这将使模式更容易表达,并且匹配更有效。
您不能避免使用迭代条件和ctx,但是在为流设置键控之后,成本应该更低。
另外,您的代码示例与文本描述不匹配。文本显示“5秒内”和“3次或更多次”,而代码
within(Time.seconds(2))
以及timesOrMore(2)
.