ApacheKafka—如何以避免重新创建状态存储的方式重新启动kafkastreams拓扑

0h4hbjxa  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(238)

我用kafkastreams dsl和处理器api编写了一些代码来进行实时计算:

KStreamBuilder builder = new KStreamBuilder();
  KStream<String, String> logs = builder.stream(stringSerde, stringSerde, ADDCASH_TOPIC_NAME);

  StateStoreSupplier countStore = Stores.create("Counts")
            .withKeys(Serdes.String())
            .withValues(Serdes.Long())
            .persistent()
            .build();

 logs.process(() -> new AddCashProcessor(), countStore.name());

我想知道:当我重新启动拓扑时,如果 Stores.create(..) 方法将重新创建statestore?

kqlmhetl

kqlmhetl1#

它将重用现有状态,并且仅在没有存储时创建和清空/新建存储。

相关问题