flink cep事件连接到后台数据流

0s7z1bwu  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(477)

我有两个数据流(eg)

ts | device | custId | temp
1 | 'device1'| 1 | 10
1 | 'device2'| 4 | 7
2 | 'device1'| 1 | 10
3 | 'device1'| 1 | 10
4 | 'device1'| 1 | 10
5 | 'device2'| 4 | 10

我已经创建了一个cep模式,我想检查4秒内的温度是否大于30。

val pattern = Pattern.begin[Device]("start")
      .where(_.sumtemp >= 30)
      .within(Time.seconds(4))

有没有一种方法可以将这个模式流的输出连接到另一个传入的数据流以得到下面的结果?。

ts | custId | morethanthiry
1 | 1 | yes
2 | 4 | no

如果能举个例子来说明这一点,我将不胜感激。

pkwftd7m

pkwftd7m1#

有多种选择。你可以用一个 coGroup 例子:

set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());

您可以将其视为sql中的联接。
实现的一个小示例:

class MyCoGroupFunction extends RichCoGroupFunction[DataTypeOfStream1, DataTypeOfStream2, DataTypeOfOutput] {

      override def coGroup(first: DataTypeOfStream1,
                         second: DataTypeOfStream2],
                         out: DataTypeOfOutput): Unit = {

           out.collect(...)
           //your output

      }
}

如果需要,也可以使用状态。
也有其他选择加入两个流,如
联合(如果要连接的流具有相同的数据类型)
连接
coflatmap方法之间的差异很小。
看到了吗https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/ 更多信息。

相关问题