cep迭代条件

ylamdve6  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(370)

我面临的问题是,我无法在scala中对单个cep模式执行求和。我想检测特定customerid的sum何时大于6100。我正在向cep.pattern(…)提供一个键控流。我在下面提供了构建模式的代码。

val pattern1 =Pattern.begin[GenericRecord]("start").where((v,ctx) => {

      lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum

      print((sum + v.get("amount").toString.toLong).toString)
     //print(sum+v.get("amount").toString.toLong>6100)
      //println(v.get("customer_id").toString+v.get("amount").toString+" , ")
      (sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
    }).oneOrMore

我的输入是avro格式的,flink正在使用Kafka的输入。输入如下-:

{"trasanction_id":196,"customer_id":28,"datetime":"2017-09-01 12:35:08","amount":6094,"state":"FAILED"}
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
{"trasanction_id":199,"customer_id":27,"datetime":"2017-09-01 12:36:05","amount":2399,"state":"FAILED"}
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}```

但是,下面的代码在使用两种模式时运行良好-:

val pattern1=Pattern.begin[GenericRecord]("start").followedBy("middle").where((v,ctx) => {

      lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum

      print((sum + v.get("amount").toString.toLong).toString)
     //print(sum+v.get("amount").toString.toLong>6100)
      //println(v.get("customer_id").toString+v.get("amount").toString+" , ")
      (sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
    }).oneOrMore
busg9geu

busg9geu1#

getEventsForPattern 返回已与模式匹配的值。让我们分析一下客户 27 . 处理事件时

{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}

您的第一个代码段拒绝此消息,因为它不满足以下条件: sum + amount = 0 + 6094 < 6100 . 处理时

{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}

你的情况将再次检查 0 + 547 > 6100 这就是为什么你看不到输出。
在您的第二个示例中,您正在使用 followedBy 运算符,表示要处理成对的元素。无条件接受第一笔交易(因为您不包括 where 操作员)现在它将由 ctx.getEventsForPattern("start") 打电话。我希望你了解这个代码的行为。 CEP 主要用于发现数据中的模式,而不是聚合它们。你的问题可以通过窗口化和过滤来解决-不需要使用 CEP 在这里。

相关问题