我正在尝试使用一个新功能(https://www.confluent.io/blog/put-several-event-types-kafka-topic/)关于在同一主题上存储两种不同类型的事件。实际上,我正在使用ConfluentVersion4.1.0并在下面设置这些属性来实现这一点
properties.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY,TopicRecordNameStrategy.class.getName());
properties.put("value.multi.type", true);
数据以无问题的方式写入主题,并且可以从kafka streams应用程序中作为通用avro记录查看。另外,在kafka模式注册表中,为该特定主题上的每个事件创建了两个新条目。
我面临的问题是,我无法使用kafka connect从此主题导出这些数据。在最简单的情况下,当我使用一个文件接收器连接器如下
{
"name": "sink-connector",
"config": {
"topics": "source-topic",
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": 1,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url":"http://kafka-schema-registry:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://kafka-schema-registry:8081",
"value.subject.name.strategy":"io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
"file": "/tmp/sink-file.txt"
}
}
我从连接器中得到一个错误,它似乎是基于avroconverter的某种序列化错误,如图所示
org.apache.kafka.connect.errors.DataException: source-topic
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 2
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:296)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:125)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:236)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:152)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:120)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:83)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
注意,schema registry有一个id为2的avro模式和另一个id为3的avro模式,它们描述了托管在同一主题上的两个事件。使用jdbc连接器时也会出现同样的问题。
那么,为了将数据从kafka集群导出到外部系统,我该如何处理这种情况呢。我的配置有什么遗漏吗?有没有可能有一个主题包含多种类型的事件,并通过Kafka连接导出它们?
1条答案
按热度按时间qnyhuwrf1#
找到了解决办法。我的代码以字符串形式传递键,以avro形式传递值。读取时,配置单元接收器尝试查找密钥的avro架构,但找不到它。添加属性key.converter=org.apache.kafka.connect.storage.stringconverter key.converter.schema.registry.url=http://localhost:8081帮助解决了问题。