我注意到 aggregate()
stage似乎可以序列化/反序列化每个元素,即使它周期性地发出结果。
streamBuilder
.stream(inputTopic, Consumed.`with`(keySerde, inputValueSerde))
.groupByKey(Serialized.`with`(keySerde, inputValueSerde))
.aggregate(
() => Snapshot.Initial(),
(_, event, prevSnap: Snapshot) => {
// ...
},
Materialized.as(stateStoreName).withValueSerde(snapshotSerde)
)
.toStream()
我希望键值存储在内存中工作,直到有一个write-on-commit。看起来不仅对每个更新都进行了写操作,而且还进行了反序列化的读操作。有人能解释下这是怎么工作的吗?我是否应该关心性能?
1条答案
按热度按时间rvpgvaaj1#
您对数据总是(反)序列化的观察是正确的,即使所有数据都在内存中。Kafka溪流的所有商店都基于
byte[]
允许适当内存管理的数组。反序列化的堆上java对象的大小未知,使得内存管理变得困难,内存使用不可预测。您的存储仍然可以在内存中工作,并且只有在必要时和提交时才会写入磁盘。