java—在调用execute()之后,是否可以在flink cep中添加新模式?

brtdzjyr  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(376)

我的代码如下:

  1. StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<MyObject> input = env.addSource(new MyCustomSource());
  3. Pattern<MyObject, ?> pattern = Pattern.<MyObject>begin("start");
  4. PatternStream<MyObject> patternStream = CEP.pattern(input, pattern);

... 定义我的模式

  1. DataStream<MyObject> resultStream = patternStream.select(new MyCustomPatternSelectFunction());
  2. resultStream.addSink(new MyCustomSinkFunction(subscriptionCriteria));
  3. try
  4. {
  5. env.execute();
  6. }
  7. catch (Exception exception)
  8. {
  9. log.debug("Error while ", exception);
  10. }

这段代码可以工作并做我想要的事情,我得到了一个遵循我设置的模式的结果流。
我想知道的是,是否有可能将新模式应用于我稍后添加到环境中的源,从而获得与不同模式匹配的不同结果流,而无需再次调用env.execute(),因为除了新结果流之外,我还获得了冗余的旧结果流(即旧模式)多次执行)?

gopyfrb3

gopyfrb31#

目前,flink的cep库不支持开箱即用的动态模式更改。因此,一旦定义了模式并开始工作,它将只处理这个定义的模式。
但是,您可以编写自己的操作符来实现 TwoInputStreamOperator 接口,在一个输入模式定义上接收流记录,在另一个输入模式定义上接收流记录(类似于coflatmap函数)。对于每个新模式,您都必须编译一个新的 NFA 并将任何新的传入流元素 NFA 也。这样,你就可以达到你的预期行为。
将来,我们很可能会将此功能添加到flink的cep库中。

相关问题