Kafka流反序列化异常

z9smfwbn  于 2023-08-02  发布在  Apache
关注(0)|答案(1)|浏览(143)

我正在尝试将消息发送到一个Key为String,value为Double的Kafka主题,并将其发送到另一个Kafka主题使用Kafka Streams我用来将消息生成到Kafka主题的命令
第一个月
我用来从Kafka Topic中消费消息的命令
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic --from-beginning -property "key.separator= - " --property "print.key=true"
我的Kafka Streams应用程序是:

public static void main(String[] args) { 
        Properties  properties = new Properties(); 
       properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
StreamsBuilder builder = new StreamsBuilder();

            KStream<String, Double> inputTopic = builder.stream("a-topic",   Consumed.with(Serdes.String(),Serdes.Double()));
            inputTopic.print(Printed.toSysOut());

                
            inputTopic.to("b-topic", Produced.with(Serdes.String(), Serdes.Double()));
            inputTopic.print(Printed.toSysOut());
            Topology topology = builder.build();
            KafkaStreams kafkaStreams = new KafkaStreams(topology,properties);
            kafkaStreams.start();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> kafkaStreams.close()));

        
}

字符串
我还尝试在生成消息时指定值序列化器:
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic your_topic_name --property "parse.key=true" --property "key.separator=:" --property "value.serializer=org.apache.kafka.common.serialization.DoubleSerializer"
我得到的错误是:

org.apache.kafka.common.errors.SerializationException: Size of data received by Deserializer is not 8
    at org.apache.kafka.common.serialization.DoubleDeserializer.deserialize(DoubleDeserializer.java:28)
    at org.apache.kafka.common.serialization.DoubleDeserializer.deserialize(DoubleDeserializer.java:21)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:204)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:128)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:305)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:984)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1591)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:985)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:762)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:613)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575)
14:19:43.085 [test-app-efdaea2c-faab-49e9-a371-c9109d0cc96c-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams -- stream-client [test-app-efdaea2c-faab-49e9-a371-c9109d0cc96c] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.


我尝试发送的数据看起来像这样:
测试:12.34
测试2:123.454
测试3:12345.321

  • 但是当我尝试将Serdes.Double()改为Serdes.String()时
KStream<String, Double> inputTopic = builder.stream("a-topic",   Consumed.with(Serdes.String(),Serdes.String()));

  • 另外,当我尝试在ProducedWith中将Serdes.Double()改为Serdes.String()时,我没有得到任何错误
inputTopic.to("b-topic", Produced.with(Serdes.String(), Serdes.String()));

The error I am getting is this

mbskvtky

mbskvtky1#

我用来将消息生成到Kafka主题的命令
控制台生产者默认将数据序列化为字符串,这就是为什么String Serde / deserializer可以工作。
清除主题并使用适当的生产者设置重新开始
您也可以编写一个单独的可运行生产者类,而不是使用控制台工具

相关问题