我的申请中有Kafka流处理:
myStream
.mapValues(customTransformer::transform)
.groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde()))
.windowedBy(TimeWindows.of(10000L).advanceBy(10000L))
.aggregate(CustomCollectorObject::new,
(key, value, aggregate) -> aggregate.collect(value),
Materialized.<String, CustomCollectorObject, WindowStore<Bytes, byte[]>>as("some_store_name")
.withValueSerde(new CustomCollectorSerde()))
.toStream()
.foreach((k, v) -> /* do something very important */);
预期行为:传入消息按键分组,并在某个时间间隔内聚合到 CustomCollectorObject
. CustomCollectorObject
只是一个有 List
在里面。每10秒后 foreach
我正在用我的汇总数据做一些非常重要的事情。我想最重要的是 foreach
每10秒呼叫一次!
实际行为:我可以在我的 foreach
被称为稀有,大约每30-35秒,这并不重要。重要的是,我一次收到3-4条信息。
问题是:我怎样才能达到预期的行为?我需要确认我的数据是在运行时处理的,没有任何延迟。
我试着把 cache.max.bytes.buffering: 0
但在这种情况下,窗口根本不起作用。
1条答案
按热度按时间njthzxwz1#
kafka streams有不同的执行模型,并提供不同的语义,即,您的期望与kafka streams不匹配。已经有多个类似的问题:
如何发送时间窗口ktable的最终kafka流聚合结果?
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
https://www.confluent.io/blog/streams-tables-two-sides-same-coin
还要注意的是,社区目前正在开发一个名为
suppress()
能够提供您想要的语义:https://cwiki.apache.org/confluence/display/kafka/kip-328%3a+ability+to+suppress+updates+for+ktables现在,您需要添加
transform()
并使用标点符号来获得所需的语义(c.f。https://docs.confluent.io/current/streams/developer-guide/processor-api.html#defining-a流处理器)