java—更改kafka流拓扑(添加重新分区步骤)对消息处理有影响吗

nr9pn0ug  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(204)

假设我想做一些可配置的转换。此转换使用状态存储管理某些状态,并且还需要重新分区,这意味着只有在配置时才能进行重新分区。现在,如果我以以下方式(或任何其他组合)运行应用程序3次(也可能是滚动升级):-
转换“a”已禁用
转换“a”已启用
转换“a”已禁用
假设这三次跑步都使用同一组Kafkabrokers:-
如果启用eos,eos保证是否在所有3次运行中都存在?
如果eos未启用,是否存在可能导致消息丢失的情况(至少一次未能提供)?
拓扑代码以更好地理解我正在尝试做的事情:-

KStream<String, Cab> kStream = getStreamsBuilder()
            .stream("topic_a", Consumed.with(keySerde, valueSerde))
            .transformValues(() -> transformer1)
            .transformValues(() -> transformer2, "stateStore_a")
            .flatMapValues(events -> events);

    mayBeEnrichAgain(kStream, keySerde, valueSerde)
            .selectKey((ignored, event) -> event.getAnotherId())
            .through(INTERMEDIATE_TOPIC_2, Produced.with(keySerde, valueSerde)) //this repartitioning will always be there
            .transformValues(() -> transformer3, "stateStore_b")
            .to(txStreamsConfig.getAlertTopic(), Produced.with(keySerde, valueSerde));

private <E extends Cab> KStream<String, E> mayBeEnrichAgain(final KStream<String, E> kStream,
        final Serde<String> keySerde,
        final Serde<E> valueSerde) {

    if(enrichmentEnabled){ //repartitioning is configurable
            return kStream.selectKey((ignored, event) -> event.id())
                    .through(INTERMEDIATE_TOPIC_1, Produced.with(keySerde, valueSerde))
                    .transformValues(enricher1)
                    .transformValues(enricher2);
    }
    else{
            return kStream;
    }
}
pjngdqdw

pjngdqdw1#

您不能简单地更改拓扑而不破坏它。
一般来说很难说插入贯穿主题是否会首先破坏应用程序。
如果它没有中断,您可能会在删除主题时“丢失”数据,因为某些未处理的数据可能仍在该主题中,并且在删除主题后,拓扑将不会读取这些数据。
通常,您应该干净地重置应用程序或使用新的 application.id 如果将应用程序升级到更改拓扑结构的较新版本。

相关问题