我正在尝试将消息发送到一个Key为String,value为Double的Kafka主题,并将其发送到另一个Kafka主题使用Kafka Streams我用来将消息生成到Kafka主题的命令
第一个月
我用来从Kafka Topic中消费消息的命令kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic --from-beginning -property "key.separator= - " --property "print.key=true"
个
我的Kafka Streams应用程序是:
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Double> inputTopic = builder.stream("a-topic", Consumed.with(Serdes.String(),Serdes.Double()));
inputTopic.print(Printed.toSysOut());
inputTopic.to("b-topic", Produced.with(Serdes.String(), Serdes.Double()));
inputTopic.print(Printed.toSysOut());
Topology topology = builder.build();
KafkaStreams kafkaStreams = new KafkaStreams(topology,properties);
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> kafkaStreams.close()));
}
字符串
我还尝试在生成消息时指定值序列化器:bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic your_topic_name --property "parse.key=true" --property "key.separator=:" --property "value.serializer=org.apache.kafka.common.serialization.DoubleSerializer"
个
我得到的错误是:
org.apache.kafka.common.errors.SerializationException: Size of data received by Deserializer is not 8
at org.apache.kafka.common.serialization.DoubleDeserializer.deserialize(DoubleDeserializer.java:28)
at org.apache.kafka.common.serialization.DoubleDeserializer.deserialize(DoubleDeserializer.java:21)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:204)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:128)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:305)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:984)
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1591)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:985)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:762)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:613)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575)
14:19:43.085 [test-app-efdaea2c-faab-49e9-a371-c9109d0cc96c-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams -- stream-client [test-app-efdaea2c-faab-49e9-a371-c9109d0cc96c] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
型
我尝试发送的数据看起来像这样:
测试:12.34
测试2:123.454
测试3:12345.321
- 但是当我尝试将Serdes.Double()改为Serdes.String()时
KStream<String, Double> inputTopic = builder.stream("a-topic", Consumed.with(Serdes.String(),Serdes.String()));
型
- 另外,当我尝试在ProducedWith中将Serdes.Double()改为Serdes.String()时,我没有得到任何错误
inputTopic.to("b-topic", Produced.with(Serdes.String(), Serdes.String()));
型
1条答案
按热度按时间mbskvtky1#
我用来将消息生成到Kafka主题的命令
控制台生产者默认将数据序列化为字符串,这就是为什么String Serde / deserializer可以工作。
清除主题并使用适当的生产者设置重新开始
您也可以编写一个单独的可运行生产者类,而不是使用控制台工具