试图实现一个flink作业来读取kafka流并聚合会话,由于某些原因,没有调用getresult()。我看到createaccumulator()和add()被调用,我希望getresult()也被调用,这样我就可以在目的地接收聚合的消息。
source.keyBy(new KeySelector<GenericRecord, String>() {
@Override
public String getKey(GenericRecord record) {
return record.get("id").toString();
}})
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<GenericRecord>() {
private static final long serialVersionUID = -4834111073247835189L;
private final long maxTimeLag = 300000L;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(GenericRecord lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp - maxTimeLag);
}
@Override
public long extractTimestamp(GenericRecord element, long previousElementTimestamp) {
long ts = 1000 * (long)element.get(("timestamp"));
return (ts);
}
})
.map(new ReduceAttributesMap())
.keyBy(new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> e) {
return e.f0;
}
})
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.aggregate(new EventAggregation())
.addSink(...)
有什么问题吗?我是不是误解了什么?谢谢你的帮助!
1条答案
按热度按时间des4xlb01#
AggregateFunction#getResult()
仅在窗口完成时调用。在您的情况下,只有在5分钟后没有特定键的事件时才会发出窗口。你能在你的资料里确认这个案子确实发生了吗?您可以尝试减少会话窗口的间隔时间,以便更容易地看到它。此外,您的水印分配者看起来很可疑。你可能想用
BoundedOutOfOrdernessTimestampExtractor
. 最后,你能再次检查一下你的时间提取是否按预期工作吗?自1970年以来,时间戳是否存储为秒?