为什么我看不到kafka streams reduce方法的任何输出?

3okqufwl  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(451)

给定以下代码:

KStream<String, Custom> stream =  
    builder.stream(Serdes.String(), customSerde, "test_in");

stream
    .groupByKey(Serdes.String(), customSerde)
    .reduce(new CustomReducer(), "reduction_state")
    .print(Serdes.String(), customSerde);

我有一个 println 在reducer的apply方法中的语句,它在我期望reduce发生时成功地打印出来。但是,上面显示的最终print语句不显示任何内容。如果我使用 to 方法而不是 print ,我在目标主题中未看到任何消息。
在reduce语句之后,我需要什么来查看reduce的结果?如果将一个值推送到输入,我不希望看到任何东西。如果按下具有相同键的第二个值,我希望应用缩减器(它确实如此),并且我还希望缩减的结果继续到处理管道中的下一步。如前所述,我在管道的后续步骤中没有看到任何东西,我不明白为什么。

aydmsdu9

aydmsdu91#

从Kafka开始 0.10.1.0 所有聚合运算符都使用内部重复数据消除缓存来减少结果ktable changelog流的负载。例如,如果您直接对两个具有相同键的记录进行计数和处理,那么完整的changelog流将是 <key:1>, <key:2> .
使用新的缓存特性,缓存将接收 <key:1> 把它储存起来,但不要马上把它送到下游。什么时候 <key:2> 它将替换缓存的第一个条目。根据缓存大小、不同密钥的数量、吞吐量和提交间隔,缓存会向下游发送条目。这发生在单个键条目的缓存逐出时,或者作为缓存的完全刷新(将所有条目发送到下游)。因此,ktable changelog可能只显示 <key:2> (因为 <key:1> 已消除重复)。
您可以通过streams配置参数控制缓存的大小 StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG . 如果将该值设置为零,则完全禁用缓存,ktable changelog将包含所有更新(有效地提供了预缓存) 0.10.1.0 行为)。
confluent文档包含一节,详细解释了缓存:
http://docs.confluent.io/current/streams/architecture.html#record-缓存
http://docs.confluent.io/current/streams/developer-guide.html#streams-开发人员指南内存管理

相关问题