我面临的问题是,我无法在scala中对单个cep模式执行求和。我想检测特定customerid的sum何时大于6100。我正在向cep.pattern(…)提供一个键控流。我在下面提供了构建模式的代码。
val pattern1 =Pattern.begin[GenericRecord]("start").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore
我的输入是avro格式的,flink正在使用Kafka的输入。输入如下-:
{"trasanction_id":196,"customer_id":28,"datetime":"2017-09-01 12:35:08","amount":6094,"state":"FAILED"}
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
{"trasanction_id":199,"customer_id":27,"datetime":"2017-09-01 12:36:05","amount":2399,"state":"FAILED"}
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}```
但是,下面的代码在使用两种模式时运行良好-:
val pattern1=Pattern.begin[GenericRecord]("start").followedBy("middle").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore
1条答案
按热度按时间busg9geu1#
getEventsForPattern
返回已与模式匹配的值。让我们分析一下客户27
. 处理事件时您的第一个代码段拒绝此消息,因为它不满足以下条件:
sum + amount = 0 + 6094 < 6100
. 处理时你的情况将再次检查
0 + 547 > 6100
这就是为什么你看不到输出。在您的第二个示例中,您正在使用
followedBy
运算符,表示要处理成对的元素。无条件接受第一笔交易(因为您不包括where
操作员)现在它将由ctx.getEventsForPattern("start")
打电话。我希望你了解这个代码的行为。CEP
主要用于发现数据中的模式,而不是聚合它们。你的问题可以通过窗口化和过滤来解决-不需要使用CEP
在这里。