kafka流物化存储构建错误

2lpgd968  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(277)

我试图在这里构建materialized.as dsl代码:https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/state/stores.html
但我得到了错误

incompatible types: org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted to org.apache.kafka.common.serialization.Serde<java.lang.Object>

在线上

.withKeySerde(Serdes.Long())

有人知道这里可能出了什么问题吗?

final StreamsBuilder builder = new StreamsBuilder();

   KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
   KTable<Long,String> dataStore = builder.table(
     "example_stream",
     Materialized.as(storeSupplier)
             .withKeySerde(Serdes.Long())
             .withValueSerde(Serdes.String()));
mxg2im7a

mxg2im7a1#

问题是 builder.table 不知道默认为的泛型类型 <Object,Object> . 后来,serde类型不匹配。您需要指定如下类型

KTable<Long,String> dataStore = builder.<Long,String>table(
    "example_stream",
    Materialized.as(storeSupplier)
        .withKeySerde(Serdes.Long())
        .withValueSerde(Serdes.String()));
k7fdbhmy

k7fdbhmy2#

我不能肯定没有一个代码样本,但是错误信息是非常清楚的。您正在向Kafka指定密钥的类型 Long . 但是,您的键实际上是其他一些java对象。例如,如果您有一条带有字符串键的消息,则此代码将更改为: .withKeySerde(Serdes.String()) . 检查键的类型并指定正确的 Serde 对于那种类型。

相关问题