如何设置时间等待聚合前吐出的消息?

afdcj2ne  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(318)

我的Kafka流聚合读取一个紧凑的主题,并执行以下操作: (0_10, ..) ,
(0_11, ..) ---> (0, [10]) (0, [10, 11]) 我想知道如何控制聚合时间窗口,这样它就不会为每个传入的消息吐出消息,而是等待并聚合其中的一些消息。假设流应用程序使用以下消息: (0_10, ..) (1_11, ..) (0_13, ..) 如果前面的3条消息在短时间内到达,我希望看到: (0,[10]) (0, [10, 13]) (1, [11]) 我不知道如何告诉我的kafka流应用程序在输出一个新值之前等待更多的聚合需要多长时间。
我的代码很简单

builder
    .table(keySerde, valueSerde, sourceTopic)
    .groupBy(StreamBuilder::groupByMapper)
    .aggregate(
        StreamBuilder::aggregateInitializer,
        StreamBuilder::aggregateAdder,
        StreamBuilder::aggregateSubtractor)
    .to(...);

目前,它有时会成批聚合,但不确定如何调整:

{"Aggregate":[100]}
{"Aggregate":[100,300,301,302]}
{"Aggregate":[100,300,301,302,404]}
sdnqo3pr

sdnqo3pr1#

我想知道如何控制聚合时间窗口,这样它就不会为每个传入的消息吐出消息,而是等待并聚合其中的一些消息。
这是不可能与Kafka流的窗口。一般来说,kafka streams窗口不会“关闭”或“结束”,因为你不能告诉它一旦窗口“关闭”就会产生最终结果(没有这样的概念)。这是为了适应迟到的结果。当消息到达聚合窗口时,您将看到更新。kafka流吐出更新的频率取决于缓存(见下文)。更多信息请参见:如何发送时间窗口ktable的最终kafka流聚合结果?
目前,它有时会成批聚合,但不确定如何调整:
你在那里看到的很可能是缓存在后台商店的结果 KTables . KTables 只有当下游消息的changelogs刷新并且提交了它们的偏移量时,才转发下游消息。这是为了在需要恢复其状态时保持一致性。如果您更改kafka streams应用程序的提交间隔,那么缓存刷新的频率将降低,因此您将看到较少的从应用程序转发的更新 KTable (变更日志、聚合等)。但这与开窗无关。
综上所述,如果您想对changelog流进行窗口聚合,您可以将其从 KTableKStream 使用 KTable#toStream() . 然后可以在聚合步骤中指定窗口。

相关问题