如何在同一主题上使用globalktable和statestore?

odopli94  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(367)

澄清一下,我是Kafka的新手,所以如果我的问题似乎没有文件记录,我很抱歉,我正在阅读教程,文档和一切我能理解这一点。
我试图从globalstore读取所有值以更新它的值,然后使用已经存在的statestore来放置这些新的更新值。
我这么做是因为当我这么做的时候:

this.stateStore.all();

我只有十分之一的数据,如果我理解正确的话,这是因为我有10个分区,而ss只读取一个分区(尽管我不太明白为什么)
这是我的全球表:

public StreamsBuilder declareTopology(StreamsBuilder builder) {

        logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",
                getInputTopic(),
                getDataTopic(),
                getToEsTopic());

        builder.globalTable(
                getDataTopic(),
                Consumed.with(Serdes.String(), fooSerdes)
                        .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),
                Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(
                        "foosktable")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(fooSerdes)
                        .withLoggingEnabled(new HashMap<>()));
    ...

这是addstatestore,我不能删除它,因为它在代码的其他地方使用:

...

       builder.addStateStore(
            Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("foosktable"),
                    Serdes.String(),
                    fooSerdes));
    ...

    return builder;
}

所以,理论上,我想做的是删除同样使用同一主题的statestore,然后使用我的data.process主题之一放置数据,问题是这个处理器用这个statestore做其他事情,所以我不能取消它。
我在这里迷路了,任何光线都会很有帮助。谢谢!

uidvcgyl

uidvcgyl1#

有点不清楚你到底想达到什么目的。但是,有一些高层次的解释:
GlobalKTable 只有一个目的:从一个主题中读取数据而不做任何修改,从而允许 KStream-GlobalKTable -通过“交互式查询”加入或查询商店。
因此,您不能真正做您想做的事情,因为从全局存储复制数据到另一个存储是不可能的。您需要复制输入主题并阅读两遍:(1)作为 GlobalKTable (2)作为常规 KStream 在将数据放入存储之前修改数据。对于(2)你可以使用 transform() .
希望这有帮助。

相关问题