kafka流-使用protobuf serde获取问题

lztngnrs  于 2021-06-27  发布在  Java
关注(0)|答案(0)|浏览(301)

我正在创建一个kafka流应用程序,我的主题数据来自protobuf。我们可以为此创建java代码绑定。但是,我很难使用正确的serde来使用主题中的数据。有人能告诉我我在这里做错了什么吗。
以下是我使用的属性定义:

Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id-config");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-broker:my-port");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class);

    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

我的serde课

public class AppSerdes extends Serdes {

public static KafkaProtobufSerde<ProtobufClass1> createConfiguredSerde1() {
    KafkaProtobufSerde<ProtobufClass1> serde = new KafkaProtobufSerde<ProtobufClass1>();
    Map<String, Object> serdeConfig = getSerdeConfig();
    serde.configure(serdeConfig, false);
    return serde;
}

public static KafkaProtobufSerde<ProtobufClass2> createConfiguredSerde2() {
    KafkaProtobufSerde<ProtobufClass2> serde = new KafkaProtobufSerde<ProtobufClass2>();
    Map<String, Object> serdeConfig = getSerdeConfig();
    serde.configure(serdeConfig, false);
    return serde;
}

private static   Map<String, Object> getSerdeConfig()
{
    Map<String, Object> serdeConfig = new HashMap<>();
    serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

    return serdeConfig;
}

}
我就是这样创建kstream和ktable示例的:

StreamsBuilder streamBuilder = new StreamsBuilder();
    KTable<String, ProtobufClass1> table = streamBuilder.table("topic1",
            Consumed.with(AppSerdes.String(), AppSerdes.createConfiguredSerde1()));
    KStream<String, ProtobufClass2> stream = streamBuilder.stream("topic2".
            Consumed.with(AppSerdes.String(), AppSerdes.createConfiguredSerde2()));

然而,我得到以下错误:
org.apache.kafka.streams.errors.streamsexception:调用处理器的classcastexception。处理器的输入类型是否与反序列化类型匹配?检查serde设置并更改streamconfig中的默认serde或通过方法参数提供正确的serde。确保处理器可以接受key:java.lang.string和value:com.google.protobuf.dynamicmessage类型的反序列化输入。请注意,尽管错误的serde是导致错误的常见原因,但强制转换异常可能还有另一个原因(例如在用户代码中)。例如,如果一个处理器连接到一个存储区,但不正确地强制转换泛型,那么在处理过程中可能会引发类强制转换异常,但原因不会是错误的serdes。在org.apache.kafka.streams.processor.internals.processornode.process(processornode。java:185)位于org.apache.kafka.streams.processor.internals.processorcontextimpl.forwardinternal(processorcontextimpl。java:273)在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:252)在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:219)在org.apache.kafka.streams.processor.internals.sourcenode.process(sourcenode。java:86)位于org.apache.kafka.streams.processor.internals.streamtask.lambda$process$1(streamtask)。java:703)在org.apache.kafka.streams.processor.internals.metrics.streamsmetricsimpl.maybeemeasurerelaticy(streamsmetricsimpl。java:883)在org.apache.kafka.streams.processor.internals.streamtask.process(streamtask。java:703)在org.apache.kafka.streams.processor.internals.taskmanager.process(taskmanager。java:1105)在org.apache.kafka.streams.processor.internals.streamthread.runonce(streamthread。java:647)在org.apache.kafka.streams.processor.internals.streamthread.runloop(streamthread。java:553)在org.apache.kafka.streams.processor.internals.streamthread.run(streamthread。java:512)原因:java.lang.classcastexception:com.google.protobuf.dynamicmessage无法强制转换为iit.datahub.party.system\u crm.v1.customeraddressbase$customeraddressbaseentity,位于org.apache.kafka.streams.kstream.internals.kstreamimpl.lambda$internalselectkey$0(kstreamimpl)。java:234)在org.apache.kafka.streams.kstream.internals.kstreammap$kstreammapprocessor.process(kstreammap。java:41)在org.apache.kafka.streams.processor.internals.processoradapter.process(processoradapter。java:71)在org.apache.kafka.streams.processor.internals.processornode.lambda$process$2(processornode。java:181)在org.apache.kafka.streams.processor.internals.metrics.streamsmetricsimpl.maybeemeasurerelaticy(streamsmetricsimpl。java:883)在org.apache.kafka.streams.processor.internals.processornode.process(processornode。java:181) ... 还有11个

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题