我试图了解globalktable在kafka中是如何工作的,为此我尝试编写示例代码。我已经创建了globalktable,但我想看到我也尝试了peek函数,但它不可用,现在我尝试按视图,但它给出了编译时错误。
在scala中查看globalktable的写入方式是什么?
我试过的是
'''
val genderGlobalTable: GlobalKTable[String, abc] = builder
.globalTable(kafkaStreamConfig.getString("abc-topic"),
Materialized.as("abcStore")
.withKeySerde(stringSerde)
.withValueSerde(abcSerde))
implicit val streams: KafkaStreams = new KafkaStreams(builder.build(), properties)
val view: ReadOnlyKeyValueStore[String, abc] = streams
.store("abcStore", new QueryableStoreType[abc])
'''
1条答案
按热度按时间7y4bm7vi1#
无法创建的示例
QueryableStoreType
这是一个接口。相反,您需要使用工厂类QueryableStoreTypes
(注意,名称是复数,而接口名称是单数)以获取所需的类型。为了一个
GlobalKTable
你可以用QueryableStoreTypes.keyValueStore()
(或根据您的Kafka流版本,QueryableStoreTypes.timestampedKeyValueStore()
).