我正在编写一个简单的示例,使用最新的github版本1.1-snapshot在flink中测试cep的新scalaapi。
模式只是对值的检查,并输出单个字符串作为每个匹配模式的结果。代码如下:
val pattern : Pattern[(String, Long, Int), _] = Pattern.begin("start").where(_._3 < 4)
val cepEventAlert = CEP.pattern(streamingAlert, pattern)
def selectFn(pattern : mutable.Map[String, (String, Long, Int)]): String = {
val startEvent = pattern.get("start").get
"Alerta:"+startEvent._1+": Pattern"
}
val patternStreamSelected = cepEventAlert.select(selectFn(_))
patternStreamSelected.print()
它在1.1-snapshot下编译和运行,没有问题,但是jobmanager输出没有显示该print()的迹象。即使放松模式条件,只设置一个“开始”(接受所有事件)也不会返回任何结果。
另外,在尝试添加阶段时,代码无法编译。如果我将模式更改为(查找第3个字段小于4的两个连续事件):
Pattern.begin("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))
然后编译器抛出:
error: missing parameter type for expanded function ((x$4) => x$4._3.$less(4))
在“开始”阶段之后的第一个where()上显示错误。我尝试显式设置参数类型:
(x: (String, Long, Int)) => x._3 < 4
这样它就可以再次编译,但是当它在flink上运行时,就不会显示任何输出。streamingalert是一个scala数据流[(string,long,int)],在代码的其他部分,我可以使用 _._ < 4
没有问题,输出似乎正确。
1条答案
按热度按时间uklbhaso1#
这个
print()
流api中的api调用不会触发紧急执行。你还得打电话env.execute()
在你节目的最后。在定义模式时,应该在某个地方提供事件类型。要么像以前那样做,要么通过的类型参数来做
begin
: