我正在尝试设置一个流,其中首先执行countwindow。countwindow发出的结果需要传递到不同的时间窗口。问题是timewindow不会发出任何结果。
我找到了一个非常简单的代码来演示这个问题:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
env
.addSource(new RichSourceFunction[Int] {
override def cancel(): Unit = {}
override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
var i = 0
while (true) {
println(s"Source emits element ${i}")
ctx.collect(i)
i = i + 1
Thread.sleep(1000)
}
}
})
.keyBy(new KeySelector[Int, String] {
override def getKey(value: Int): String = {
println("getKey 1")
"KEY1"
}
})
.countWindow(2, 1)
.reduce(new ReduceFunction[Int] {
override def reduce(value1: Int, value2: Int): Int = {
println("reduce 1")
value1
}
})
.keyBy(new KeySelector[Int, String] {
override def getKey(value: Int): String = {
println("getKey 2")
"KEY2"
}
})
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction[Int] {
override def reduce(value1: Int, value2: Int): Int = {
println("reduce 2")
value1
}
})
.print()
使用上面的代码,我希望每5秒打印一个元素。然而,情况并非如此。实际输出显示“打印”功能只达到一次:
Source emits element 0
getKey 1
getKey 2
getKey 2
1> 0
Source emits element 1
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 2
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 3
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 4
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 5
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 6
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 7
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 8
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 9
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 10
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 11
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 12
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
1条答案
按热度按时间y0u0uwnf1#
有趣的例子。如果将ingestiontime更改为processingtime,则示例将正确运行。
在调试器中,我看到的是,对于InjectionTime,countwindow生成的streamrecords不再具有有效的时间戳,因此timewindow无法正常工作。
要解决这个问题,您需要在countwindow之后重新建立时间戳和水印,如下所示:
类似的技术也适用于事件时间。