KStream<Integer, Integer> stream;
KGroupedStream<Integer, Integer> grouped = stream.groupByKey();
KTable<Integer, Integer> aggregated = grouped.aggregate(
() -> 0,
(k, i, agg) -> {
if (agg == null)
agg = 0;
Integer sum = agg + i;
return sum > 100 ? null : sum;
});
我的流上的消息是:
(1, 50)
(1, 75)
(1, 50)
当第二条消息到达时,聚合器返回null。做 KTable aggregated
接收(1,null)并删除key=1的状态?
当消息#3到达时 agg
null或再次调用初始值设定项并设置 agg
到0?
如果我使用reduce而不是aggregate,那么如果reducer返回null,那么下一条消息会经过reducer吗?或者它会像组中的第一条消息那样“按原样”使用吗?
谢谢,大卫
1条答案
按热度按时间li9yvcax1#
当第二条消息到达时,聚合器返回null。ktable aggregated是否接收(1,null)并删除key=1的状态?
对。
当消息#3到达时,agg为null,或者是否再次调用初始值设定项并将agg设置为0?
再次调用初始值设定项。
如果我使用reduce而不是aggregate,那么如果reducer返回null,那么下一条消息会经过reducer吗?或者它会像组中的第一条消息那样“按原样”使用吗?
减少工程量。因此,如果你回来
null
,以下消息将被视为第一条消息进行处理。元评论:为什么你不运行代码并尝试一下???