timesormore的行为

qxgroojn  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(271)

我想找出接下来发生的事情的模式
内部模式为:
键“sensorarea”的值相同。
键“customerid”的值不同。
在5秒内。
这种模式需要
仅当前一个事件发生3次或以上时发出“警报”。
我写了一些东西,但我肯定它不完整。
两个问题
当我处于“下一个”模式时,我需要访问上一个事件字段,如果不使用ctx命令,怎么做呢,因为它很重。。
我的代码带来了奇怪的结果-这是我的输入

我的输出是

3> {first=[Customer[timestamp=50,customerId=111,toAdd=2,sensorData=33]], second=[Customer[timestamp=100,customerId=222,toAdd=2,sensorData=33], Customer[timestamp=600,customerId=333,toAdd=2,sensorData=33]]}

尽管我期望的输出应该是所有前六个事件(用户111/222和传感器分别是33、44和55)

Pattern<Customer, ?> sameUserDifferentSensor = Pattern.<Customer>begin("first", skipStrategy)
            .followedBy("second").where(new IterativeCondition<Customer>() {
                @Override
                public boolean filter(Customer currCustomerEvent, Context<Customer> ctx) throws Exception {
                    List<Customer> firstPatternEvents = Lists.newArrayList(ctx.getEventsForPattern("first"));
                    int i = firstPatternEvents.size();
                    int currSensorData = currCustomerEvent.getSensorData();
                    int prevSensorData = firstPatternEvents.get(i-1).getSensorData();
                    int currCustomerId = currCustomerEvent.getCustomerId();
                    int prevCustomerId = firstPatternEvents.get(i-1).getCustomerId();
                    return currSensorData==prevSensorData && currCustomerId!=prevCustomerId;
                }
            })
            .within(Time.seconds(5))
            .timesOrMore(3);

    PatternStream<Customer> sameUserDifferentSensorPatternStream = CEP.pattern(customerStream, sameUserDifferentSensor);
    DataStream<String> alerts1 = sameUserDifferentSensorPatternStream.select((PatternSelectFunction<Customer, String>) Object::toString);
mhd8tkvw

mhd8tkvw1#

如果您首先按sensorarea键控流,您将有一个更轻松的时间。你将在流上进行模式匹配,其中所有的事件都是针对一个传感器区域的,这将使模式更容易表达,并且匹配更有效。
您不能避免使用迭代条件和ctx,但是在为流设置键控之后,成本应该更低。
另外,您的代码示例与文本描述不匹配。文本显示“5秒内”和“3次或更多次”,而代码 within(Time.seconds(2)) 以及 timesOrMore(2) .

相关问题