countwindow和timewindow如何连接?

zbdgwd5y  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(344)

我正在尝试设置一个流,其中首先执行countwindow。countwindow发出的结果需要传递到不同的时间窗口。问题是timewindow不会发出任何结果。
我找到了一个非常简单的代码来演示这个问题:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

env
 .addSource(new RichSourceFunction[Int] {
  override def cancel(): Unit = {}

  override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
   var i = 0
   while (true) {
    println(s"Source emits element ${i}")
    ctx.collect(i)
    i = i + 1
    Thread.sleep(1000)
   }
  }
 })
 .keyBy(new KeySelector[Int, String] {
  override def getKey(value: Int): String = {
   println("getKey 1")
   "KEY1"
  }
 })
 .countWindow(2, 1)
 .reduce(new ReduceFunction[Int] {
  override def reduce(value1: Int, value2: Int): Int = {
   println("reduce 1")
   value1
  }
 })
 .keyBy(new KeySelector[Int, String] {
  override def getKey(value: Int): String = {
   println("getKey 2")
   "KEY2"
  }
 })
 .timeWindow(Time.seconds(5))
 .reduce(new ReduceFunction[Int] {
  override def reduce(value1: Int, value2: Int): Int = {
   println("reduce 2")
   value1
  }
 })
 .print()

使用上面的代码,我希望每5秒打印一个元素。然而,情况并非如此。实际输出显示“打印”功能只达到一次:

Source emits element 0
getKey 1
getKey 2
getKey 2
1> 0
Source emits element 1
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 2
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 3
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 4
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 5
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 6
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 7
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 8
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 9
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 10
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 11
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 12
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
y0u0uwnf

y0u0uwnf1#

有趣的例子。如果将ingestiontime更改为processingtime,则示例将正确运行。
在调试器中,我看到的是,对于InjectionTime,countwindow生成的streamrecords不再具有有效的时间戳,因此timewindow无法正常工作。
要解决这个问题,您需要在countwindow之后重新建立时间戳和水印,如下所示:

...
  .countWindow(2, 1)
  .reduce(new ReduceFunction[Int] {
    override def reduce(value1: Int, value2: Int): Int = {
      println("reduce 1")
      value1
    }
  })
  .assignTimestampsAndWatermarks(new IngestionTimeExtractor[Int]())
  .keyBy(new KeySelector[Int, String] {
    override def getKey(value: Int): String = {
      println("getKey 2")
      "KEY2"
    }
  })
  ...

类似的技术也适用于事件时间。

相关问题