澄清一下,我是Kafka的新手,所以如果我的问题似乎没有文件记录,我很抱歉,我正在阅读教程,文档和一切我能理解这一点。
我试图从globalstore读取所有值以更新它的值,然后使用已经存在的statestore来放置这些新的更新值。
我这么做是因为当我这么做的时候:
this.stateStore.all();
我只有十分之一的数据,如果我理解正确的话,这是因为我有10个分区,而ss只读取一个分区(尽管我不太明白为什么)
这是我的全球表:
public StreamsBuilder declareTopology(StreamsBuilder builder) {
logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",
getInputTopic(),
getDataTopic(),
getToEsTopic());
builder.globalTable(
getDataTopic(),
Consumed.with(Serdes.String(), fooSerdes)
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),
Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(
"foosktable")
.withKeySerde(Serdes.String())
.withValueSerde(fooSerdes)
.withLoggingEnabled(new HashMap<>()));
...
这是addstatestore,我不能删除它,因为它在代码的其他地方使用:
...
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("foosktable"),
Serdes.String(),
fooSerdes));
...
return builder;
}
所以,理论上,我想做的是删除同样使用同一主题的statestore,然后使用我的data.process主题之一放置数据,问题是这个处理器用这个statestore做其他事情,所以我不能取消它。
我在这里迷路了,任何光线都会很有帮助。谢谢!
1条答案
按热度按时间uidvcgyl1#
有点不清楚你到底想达到什么目的。但是,有一些高层次的解释:
一
GlobalKTable
只有一个目的:从一个主题中读取数据而不做任何修改,从而允许KStream-GlobalKTable
-通过“交互式查询”加入或查询商店。因此,您不能真正做您想做的事情,因为从全局存储复制数据到另一个存储是不可能的。您需要复制输入主题并阅读两遍:(1)作为
GlobalKTable
(2)作为常规KStream
在将数据放入存储之前修改数据。对于(2)你可以使用transform()
.希望这有帮助。