对数据流数据进行分类或分组,并分别使用cep进行处理

mwg9r5ms  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(371)

假设我有一个数据流

x:1, y:2 , z:3 , x:7 , y:-1, z:0, z:3 , z:2, y:3 ,x: 2 ,y:6

我该怎么说 x,y,z 在他们自己的桶里,应用我的cep规则。

x:1, x:7,x: 2 
y:2, y:-1, y:3 , y:6
z:3, z:0 , z:3, z:2

或者换个说法。如何将流拆分为这些类别(每个x、y、z对应一个流)。我会得到3个子流有自己的cep处理。
这里的挑战是,x,y,z不是预先定义的,所以我不能使用if或switch语句预先创建流和赋值。
编辑:模式是这样的,“如果x值在过去10分钟内在0-8之间

9ceoxa92

9ceoxa921#

这是通过在category属性上“键入”流来完成的。
如果你有 DataStream[(String, Int)] 如下所示:

val yourStream: DataStream[(String, Int)] = ???
val yourPattern: Pattern = ???

// key by String attribute
val keyedStream = yourStream.keyBy(_._1) 
// apply pattern on keyed stream 
val patternStream: PatternStream = CEP.pattern(keyedStream, yourPattern)

模式将针对键控属性的每个不同值进行求值。

相关问题