我使用apply函数来获得唯一的计数。但我想在唯一数据的数量发生变化时收集计数。代码:
hashMap .keyBy(x => x.hash) .timeWindow(Time.minutes(15)) .apply(new DataWindow())
但是应用函数是在时间窗口结束时触发的,如何在没有滑动窗口的情况下更频繁地获取值。
gojuced71#
我建议使用 ProcessFunction 而不是Windows。您需要使用键分区状态来保存决定用于跟踪唯一值的任何数据结构。您可以使用事件时间计时器或处理时间计时器每隔15分钟清除一次状态,具体取决于适合您的应用程序的时间类型。但是如果你想继续使用窗口,你可以实现一个自定义的 Trigger . 在这种情况下,您需要保持triggercontext上可用的分区状态。另请参阅有关窗口和触发器的更多信息。
ProcessFunction
Trigger
1条答案
按热度按时间gojuced71#
我建议使用
ProcessFunction
而不是Windows。您需要使用键分区状态来保存决定用于跟踪唯一值的任何数据结构。您可以使用事件时间计时器或处理时间计时器每隔15分钟清除一次状态,具体取决于适合您的应用程序的时间类型。但是如果你想继续使用窗口,你可以实现一个自定义的
Trigger
. 在这种情况下,您需要保持triggercontext上可用的分区状态。另请参阅有关窗口和触发器的更多信息。