kafka流窗口聚合批处理

ylamdve6  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(531)

我的申请中有Kafka流处理:

myStream
    .mapValues(customTransformer::transform)
    .groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde()))
    .windowedBy(TimeWindows.of(10000L).advanceBy(10000L))
    .aggregate(CustomCollectorObject::new,
            (key, value, aggregate) -> aggregate.collect(value),
            Materialized.<String, CustomCollectorObject, WindowStore<Bytes, byte[]>>as("some_store_name")
                    .withValueSerde(new CustomCollectorSerde()))
    .toStream()
    .foreach((k, v) -> /* do something very important */);

预期行为:传入消息按键分组,并在某个时间间隔内聚合到 CustomCollectorObject . CustomCollectorObject 只是一个有 List 在里面。每10秒后 foreach 我正在用我的汇总数据做一些非常重要的事情。我想最重要的是 foreach 每10秒呼叫一次!
实际行为:我可以在我的 foreach 被称为稀有,大约每30-35秒,这并不重要。重要的是,我一次收到3-4条信息。
问题是:我怎样才能达到预期的行为?我需要确认我的数据是在运行时处理的,没有任何延迟。
我试着把 cache.max.bytes.buffering: 0 但在这种情况下,窗口根本不起作用。

njthzxwz

njthzxwz1#

kafka streams有不同的执行模型,并提供不同的语义,即,您的期望与kafka streams不匹配。已经有多个类似的问题:
如何发送时间窗口ktable的最终kafka流聚合结果?
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
https://www.confluent.io/blog/streams-tables-two-sides-same-coin
还要注意的是,社区目前正在开发一个名为 suppress() 能够提供您想要的语义:https://cwiki.apache.org/confluence/display/kafka/kip-328%3a+ability+to+suppress+updates+for+ktables
现在,您需要添加 transform() 并使用标点符号来获得所需的语义(c.f。https://docs.confluent.io/current/streams/developer-guide/processor-api.html#defining-a流处理器)

相关问题