我有以下设置。Kafka的经纪人在Kubernetes身上运行。一个带有kafka流的springboot应用程序和一个输入主题ksteram。
在stream上,我在key上分组,使用aggregation并构建一个materializedview->keyvaluestorerocksdb。数据库包括135048个键/值对。
我像这样读取所有键/值对
private ReadOnlyKeyValueStore<String, PriceDomain> keyValueStore;
public List<PriceModel> fetchAllPriceInErrorState() {
if (this.keyValueStore == null) {
this.keyValueStore = queryService.getQueryableStore(
ModelStrings.PRICE_STORE_NAME,
QueryableStoreTypes.keyValueStore()
);
}
while (keyValueIterator.hasNext()) {
KeyValue<String, PriceDomain> keyValue = keyValueIterator.next();
PriceDomain priceDomain1 = keyValue.value;
PriceModel priceModel = convertToPriceModel(priceDomain1);
priceModelList.add(priceModel);
}
}
这将需要~3-4秒来执行。有没有办法增加执行时间来从存储中获取所有键值对?当然,大多数时候我可以直接通过键访问值,这会很快。但有时我需要所有的键/值对。
假设将来有更多的键/值对(~500-1000k),那么这将花费~5-10倍的时间
谢谢你的帮助
1条答案
按热度按时间xqkwcwgp1#
我认为您必须设置更多的分区(输入主题基于构建哪个存储),而不是运行几个这样的应用程序。每个示例将只使用一些消息子集(key->values),并且运行得更快