如何使用flink-cep库检测语言模式?
例如:假设设备有一些问题,所以它会连续地发布诸如on、off之类的值。如果问题持续30分钟,如何使用cep检测模式。我在下面提到的一些示例数据。
OFF 16/08/18 11:38
ON 16/08/18 11:38
OFF 16/08/18 11:38
ON 16/08/18 11:37
OFF 16/08/18 11:37
ON 16/08/18 11:36
OFF 16/08/18 11:36
OFF 16/08/18 11:36
ON 16/08/18 11:36
OFF 16/08/18 11:35
ON 16/08/18 11:35
ON 16/08/18 11:34
OFF 16/08/18 11:34
1条答案
按热度按时间ilmyapht1#
如果您的流是按时间排序的(只需要为每个单独的设备对流进行排序),那么您可以轻松地转换流以使分析更容易。一
RichFlatMapFunction
这样会将开关事件序列转换为状态更改事件序列:现在问题已经简化为确定流在一段时间间隔内是否有一些更改事件。这可以很容易地完成滑动窗口,或者你可以使用cep如果你喜欢。
你也可以完全用cep来做。从概念上讲,您可以按以下方式处理此问题:
定义一个与on+off匹配的单独模式+
然后定义一个模式组,每当在某个时间间隔内出现n次时,该模式组就与该开/关模式匹配