Kafka 无法获取状态存储wf-spec-name,因为拓扑中未注册此类存储

64jmpszr  于 2022-12-11  发布在  Apache
关注(0)|答案(1)|浏览(81)

我有一个简单的KafkaStreams拓扑结构,它包含一个主题'foo',其中每个记录都是一个JSON blob(我将其串接到一个TaskDefSchema中),以某个guid为键.我希望构造两个可查询的GlobalKTable 's,这样就有两个ReadOnlyKeyValueStore<String, FooThing>'s:
1.一个由guid键控(即输入主题foo未更改)。
1.一个由JSON blob中的TaskDefSchema.name字段作为键控。
下面是我的代码:

StreamsBuilder builder = new StreamsBuilder();

        KStream<String, TaskDefSchema> taskDefEvents = builder.stream(
            config.getTaskDefTopic(), // 'foo' in the question above
            Consumed.with(Serdes.String(), new TaskDefSerdes())
        );

        KStream<String, TaskDefSchema> taskDefEventsNameKeyed = taskDefEvents.selectKey(
            ((k, v) -> v.name)
        );
        String intermediateTopicByName = config.getTaskDefTopic() + "__intermediate_name";
        String intermediateTopicByGuid = config.getTaskDefTopic() + "__intermediate_guid";

        taskDefEventsNameKeyed.to(intermediateTopicByName);
        taskDefEvents.to(intermediateTopicByGuid);

        this.taskDefNameTable = builder.globalTable(
            intermediateTopicByName,
            Materialized.<String, TaskDefSchema, KeyValueStore<Bytes, byte[]>>
                as("wf-spec-name")
                .withKeySerde(Serdes.String())
                .withValueSerde(new TaskDefSerdes())
        );

        this.taskDefGuidTable = builder.globalTable(
            intermediateTopicByGuid,
            Materialized.<String, TaskDefSchema, KeyValueStore<Bytes, byte[]>>
                as("wf-spec-guid")
                .withKeySerde(Serdes.String())
                .withValueSerde(new TaskDefSerdes())
        );

        return builder.build();

当我试着查询商店时如下:

kafkaStreams.store(
            StoreQueryParameters.fromNameAndType(
                "wf-spec-guid",
                QueryableStoreTypes.keyValueStore()
            )
        );

函数调用返回一个按预期工作的KeyValueStore。* 注意:上面的块对应于第二个全局表-taskDefGuidTable。*
但是,当我执行以下调用以检索重新键控的globalTable(即taskDefGuidTable)时,我得到一个错误:

kafkaStreams.store(
            StoreQueryParameters.fromNameAndType(
                "wf-spec-name", // this is the line that's different (:
                QueryableStoreTypes.keyValueStore()
            )
        );

/* *********************** */

org.apache.kafka.streams.errors.UnknownStateStoreException: Cannot get state store wf-spec-name because no such store is registered in the topology.

任何帮助都将不胜感激。谢谢!

8yoxcaq7

8yoxcaq71#

我查询的kafkaStreams对象是错误的(我的代码中有多个KafkaStreams ...哎呀)。

相关问题