我能和Flink·塞普做一场懒洋洋的比赛吗

0mkxixxg  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(437)

我想用flinkcep只做一个'懒惰'的模式匹配。我该怎么做?e、 我有一个输入流acb,我想在一个c上匹配,只得到3个匹配,而不是6个匹配。
我创建了以下示例来说明我的问题。

val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

case class MyEvent(id: Int, kind: String, value: String)
case class MyAggregatedEvent(id: Int, concatenatedValue: String)

val eventStream = env.fromElements(
  MyEvent(1, "A", "1"), MyEvent(1, "C", "1"),
  MyEvent(1, "A", "2"), MyEvent(1, "B", "1"), MyEvent(1, "C", "2"),
  MyEvent(1, "A", "3"), MyEvent(1, "D", "2"), MyEvent(1, "C", "3"),
  MyEvent(1, "B", "3")
)

val pattern: Pattern[MyEvent, _] = Pattern
  .begin[MyEvent]("pA").where(e => e.kind == "A")
  .next("pC").where(e => e.kind == "C")
  .within(Time.seconds(5))

val patternNextStream: PatternStream[MyEvent] = CEP.pattern(eventStream.keyBy(_.id), pattern)

val outNextStream: DataStream[MyAggregatedEvent] = patternNextStream.flatSelect {
  (pattern: scala.collection.mutable.Map[String, MyEvent], collector: Collector[MyAggregatedEvent]) =>
    val partA = pattern.get("pA").get
    val partC = pattern.get("pC").get

    collector.collect(MyAggregatedEvent(partA.id, partA.value + "=>" + partC.value))
}
outNextStream.print()

env.execute("Experiment")

这将提供以下输出:
我的聚合事件(1,1=>1)
当我将模式更改为:

val pattern: Pattern[MyEvent, _] = Pattern
  .begin[MyEvent]("pA").where(e => e.kind == "A")
  .followedBy("pC").where(e => e.kind == "C")
  .within(Time.seconds(5))

然后打印以下内容:
我的聚合事件(1,1=>1)
我的聚合事件(1,1=>2)
我的聚合事件(1,2=>2)
我的聚合事件(1,1=>3)
我的聚合事件(1,2=>3)
我的聚合事件(1,3=>3)
如何创建只与每个事件匹配一次的模式,以便输出:
我的聚合事件(1,1=>1)
我的聚合事件(1,2=>2)
我的聚合事件(1,3=>3)

7d7tgy0s

7d7tgy0s1#

目前flink的cep库还不支持这一点。匹配的语义还不能控制。我想加一个 MATCH_ALL 还有一根火柴 MATCH_FIRST 开始的模式。这个 MATCH_FIRST 一旦看到完全匹配的序列,就会丢弃所有中间状态。这应该涵盖您的用例。

相关问题