下面是一个简单的代码示例来说明我的问题:
case class Record( key: String, value: Int )
object Job extends App
{
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
val step1 = data.filter( record => record.value % 3 != 0 ) // introduces some data loss
val step2 = data.map( r => Record( r.key, r.value * 2 ) )
val step3 = data.map( r => Record( r.key, r.value * 3 ) )
val merged = step1.union( step2, step3 )
val keyed = merged.keyBy(0)
val windowed = keyed.countWindow( 3 )
val summed = windowed.sum( 1 )
summed.print()
env.execute("test")
}
这将产生以下结果:
Record(01,6)
Record(02,12)
Record(04,24)
Record(05,30)
正如预期的那样,不会为键“03”生成任何结果,因为计数窗口需要3个元素,而流中只有两个元素。
我想要的是某种带有超时的count窗口,这样,在某个超时之后,如果没有达到count窗口所期望的元素数,则使用现有元素生成部分结果。
对于这种行为,在我的示例中,当达到超时时将生成一条记录(03,15)。
3条答案
按热度按时间92dk7w1h1#
我认为您可以使用processfunction实现这个用例
其中有count属性和windowend属性。使用它,您可以决定何时收集数据。
我希望这对你有帮助。
e4yzc0pl2#
你也可以用一个自定义窗口来实现
Trigger
它在达到计数或超时过期时激发,有效地混合了内置的CountTrigger
以及EventTimeTrigger
.ubby3x7f3#
我遵循了大卫和尼拉夫的方法,下面是结果。
1) 使用自定义触发器:
在这里我颠倒了我最初的逻辑。我没有使用“计数窗口”,而是使用一个“时间窗口”,其持续时间与超时相对应,后跟一个触发器,在处理完所有元素后触发。
下面是触发代码:
2) 使用过程函数:
当所有逻辑(即,窗口、触发和求和)进入函数时: