给定以下代码:
KStream<String, Custom> stream =
builder.stream(Serdes.String(), customSerde, "test_in");
stream
.groupByKey(Serdes.String(), customSerde)
.reduce(new CustomReducer(), "reduction_state")
.print(Serdes.String(), customSerde);
我有一个 println
在reducer的apply方法中的语句,它在我期望reduce发生时成功地打印出来。但是,上面显示的最终print语句不显示任何内容。如果我使用 to
方法而不是 print
,我在目标主题中未看到任何消息。
在reduce语句之后,我需要什么来查看reduce的结果?如果将一个值推送到输入,我不希望看到任何东西。如果按下具有相同键的第二个值,我希望应用缩减器(它确实如此),并且我还希望缩减的结果继续到处理管道中的下一步。如前所述,我在管道的后续步骤中没有看到任何东西,我不明白为什么。
1条答案
按热度按时间aydmsdu91#
从Kafka开始
0.10.1.0
所有聚合运算符都使用内部重复数据消除缓存来减少结果ktable changelog流的负载。例如,如果您直接对两个具有相同键的记录进行计数和处理,那么完整的changelog流将是<key:1>, <key:2>
.使用新的缓存特性,缓存将接收
<key:1>
把它储存起来,但不要马上把它送到下游。什么时候<key:2>
它将替换缓存的第一个条目。根据缓存大小、不同密钥的数量、吞吐量和提交间隔,缓存会向下游发送条目。这发生在单个键条目的缓存逐出时,或者作为缓存的完全刷新(将所有条目发送到下游)。因此,ktable changelog可能只显示<key:2>
(因为<key:1>
已消除重复)。您可以通过streams配置参数控制缓存的大小
StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
. 如果将该值设置为零,则完全禁用缓存,ktable changelog将包含所有更新(有效地提供了预缓存)0.10.1.0
行为)。confluent文档包含一节,详细解释了缓存:
http://docs.confluent.io/current/streams/architecture.html#record-缓存
http://docs.confluent.io/current/streams/developer-guide.html#streams-开发人员指南内存管理