我试图在这里构建materialized.as dsl代码:https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/state/stores.html
但我得到了错误
incompatible types: org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted to org.apache.kafka.common.serialization.Serde<java.lang.Object>
在线上
.withKeySerde(Serdes.Long())
有人知道这里可能出了什么问题吗?
final StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
KTable<Long,String> dataStore = builder.table(
"example_stream",
Materialized.as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()));
2条答案
按热度按时间mxg2im7a1#
问题是
builder.table
不知道默认为的泛型类型<Object,Object>
. 后来,serde类型不匹配。您需要指定如下类型k7fdbhmy2#
我不能肯定没有一个代码样本,但是错误信息是非常清楚的。您正在向Kafka指定密钥的类型
Long
. 但是,您的键实际上是其他一些java对象。例如,如果您有一条带有字符串键的消息,则此代码将更改为:.withKeySerde(Serdes.String())
. 检查键的类型并指定正确的Serde
对于那种类型。