如何向flink cep数据流添加新事件?

hsgswve4  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(420)

我使用flink1.5.2来解决cep问题。
我的数据来自一个列表,当系统运行时,其他进程将向该列表添加新的事件对象。它不是套接字或网络消息。我一直在读官方网站的例子。以下是我想我应该做的步骤。
使用env.fromcollection(list)创建数据流;
定义图案
使用cep.pattern(data\u stream,pattern)获取patternstream
使用pattern\u stream.select(…implement select interface…)将复杂事件结果作为数据流获取
但我的输入流应该是无限的。我在datastream<>对象中找不到任何add()方法。我如何做到这一点?另外,我是否需要告诉datastream<>何时清理过时的事件?

xnifntxz

xnifntxz1#

当使用预先固定的有界输入集时,集合只适合作为flink的输入源,例如编写测试或只是进行实验时。如果你想要一个无限流,你需要选择一个不同的源,比如一个套接字或者一个像kafka这样的消息队列系统。
插座很容易进行实验。在linux和macos系统上,您可以使用

nc -lk 9999

在端口9999上创建flink可以绑定的套接字,以及作为输入提供的任何内容 nc (netcat)将流到你的flink作业中,一次一行。netcat也可用于windows,但不是预先安装的。
但是,您不应该计划在生产环境中使用套接字,因为它们不能重绕(这对于在故障恢复期间使用flink获得准确的结果至关重要)。

相关问题