将kafka流写入主题时如何处理avro反序列化异常

im9ewurl  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(212)

在向主题写入流时,我看到了一些异常

Output:
    Exception in thread "StreamThread-1" 
    org.apache.kafka.streams.errors.StreamsException: Failed to deserialize 
    value for record. topic=input_topic, partition=4, offset=9048083
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 572

这是密码。键为null(字符串),值为avroserde

streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);

我使用的是特定的avro serde,所以我给出了schema registry的端点

final Map<String, String> serdeConfig = Collections.singletonMap(
            AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    final Serde<avroschema> avroserde = new SpecificAvroSerde<>();
    MasterSpinsSerde.configure(serdeConfig, false); // `false` for record values

如下所示读取源流

final KStreamBuilder builder = new KStreamBuilder();
    final KStream<String, avroschema> feeds = builder.stream("input_topic");

    feeds.to(Serdes.String(), avroserde,"output_topic");
    return new KafkaStreams(builder, streamsConfiguration);

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题