我尝试Kafka流,设法流正常字符串从一个主题到另一个,但现在我想把它作为AVRO消息序列化。我环顾互联网的解决方案,但我没有找到一个,所以这是我最后的希望。这是我的代码
public static void main(String[] args) throws IOException {
String url = "http://url:9092";
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "TestAvro222");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, url);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
props.put("schema.registry.url", url);
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
url);
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream
("topic1",
Consumed.with(stringSerde, stringSerde));
File file = new File("urltoschema");
Schema schema = new Schema.Parser().parse(file);
KTable<String, GenericRecord> textLines_table = textLines.mapValues(value -> avroMaker(schema, value)).toTable();
//KTable textLines_table = textLines.toTable();
textLines_table.toStream().to("TEST-AVRO22", Produced.with(Serdes.String(), valueGenericAvroSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
当我运行它,我得到以下错误:
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic TEST-AVRO22 for task 0_0 due to:
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:177)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:152)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1296)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:101)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer.serialize(GenericAvroSerializer.java:63)
at io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer.serialize(GenericAvroSerializer.java:39)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:157)
... 28 common frames omitted
Caused by: java.net.SocketException: Connection reset
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:787)
at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:722)
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:896)
at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:722)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1615)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1520)
at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:561)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:549)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:290)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:397)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:376)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:364)
at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:51)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
... 33 common frames omitted
不知道出了什么问题,我搜索了多个网站寻找解决方案,都无济于事。请帮助
1条答案
按热度按时间juzqafwq1#
是的,我发现我的模式注册表有一个与我在代码中提供的URL不同的URL,所以修复了它