在向主题写入流时,我看到了一些异常
Output:
Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: Failed to deserialize
value for record. topic=input_topic, partition=4, offset=9048083
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 572
这是密码。键为null(字符串),值为avroserde
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
我使用的是特定的avro serde,所以我给出了schema registry的端点
final Map<String, String> serdeConfig = Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
final Serde<avroschema> avroserde = new SpecificAvroSerde<>();
MasterSpinsSerde.configure(serdeConfig, false); // `false` for record values
如下所示读取源流
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, avroschema> feeds = builder.stream("input_topic");
feeds.to(Serdes.String(), avroserde,"output_topic");
return new KafkaStreams(builder, streamsConfiguration);
暂无答案!
目前还没有任何答案,快来回答吧!