kafka流示例上状态存储和分区的混合

e0bqpujr  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(304)

我建立了一个Kafka流应用程序与国家商店。现在我正在尝试扩展这个应用程序。当在三个不同的服务器上运行应用程序时,kafka会随机分割分区和状态存储。
例如:
instance1获取:partition-0,partition-1
instance2获取:partition-2,statestore-repartition-0
示例3获取:statestore-repartition-1、statestore-repartition-2
我想为每个示例分配一个statestore和一个分区。我做错什么了?
我的kafkastreams配置:

final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);

try {
     properties.setProperty(StreamsConfig.STATE_DIR_CONFIG,
           Files.createTempDirectory(stateStoreName).toAbsolutePath().toString());
} catch (final IOException e) {
         // use the default one
}

我的流是:

stream.groupByKey()
       .windowedBy(TimeWindows.of(timeWindowDuration))
       .<TradeStats>aggregate(
           () -> new TradeStats(),
           (k, v, tradestats) -> tradestats.add(v),
           Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as(stateStoreName)
        .withValueSerde(new TradeStatsSerde()))
        .toStream();
jgwigjjp

jgwigjjp1#

从我目前所看到的情况来看(正如我在对您的问题的评论中所提到的,请分享您的状态存储定义),一切都很好,我怀疑您对这个问题有一点误解
我做错什么了?
基本上没什么。:-)
对于问题的分区部分:它们根据配置的赋值器分布在使用者周围(请参阅https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/cooperativestickyassignor.html 或相邻接口)。
对于问题的状态存储部分:这里可能存在一些关于(内存中)状态存储如何工作的误解:它们通常由一个kafka主题支持,该主题不在应用程序主机上,而是在kafka集群本身中。更准确地说,整个状态存储的一部分存在于每个应用程序主机上的(rocksdb)内存键/值存储中,正如您在问题中的状态存储分配中所示。但是,这些只是kafka集群中维护的完整状态存储的一部分或部分。
所以简单地说:一切都很好,让Kafka来完成任务,只有在你有真正特殊的用例或者很好的理由的时候才去干涉它kafka还可以确保在应用程序主机中断的情况下正确地冗余和重新平衡所有分区。
如果您仍然想自己分配一些东西,那么用例将很有趣,以获得进一步的帮助。

相关问题