我正在写我自己的SMT,做一些特定领域的清洗。
代码结构简单明了,已成功编译并添加到plugin.path
中。
创建具有以下配置的连接器时
{
"name": "sql-to-kafka",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "127.0.0.1",
"database.port": "3306",
"database.user": "username",
"database.password": "password",
"database.server.id": "11111",
"database.include.list": "test",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schemahistory.localdb",
"include.schema.changes": "false",
"database.encrypt": false,
"table.include.list": "test.bins",
"topic.prefix":"localdb",
"transforms":"unwrap,MyCustomSMT",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false,
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.MyCustomSMT.type": "MyCustomSMT$Value",
"transforms.MyCustomSMT.field": "segment"
}
}
我在连接器日志中收到以下异常
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:220)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:271)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic localdb.test.bins :
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)
... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name":"Key","namespace":"localdb.test.bins","fields":[{"name":"id","type":"int"}],"connect.name":"localdb.test.bins.Key"}
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
... 15 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Leader not known.; error code: 50004
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:544)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:532)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:490)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:257)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:366)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:337)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:115)
... 17 more
不知道为什么Avro序列化在这里抱怨,也值得一提的是,当我更新连接器配置和修改下面的配置,以使用JSONConverter的值和StringConverter的键,一切工作正常,不知道我错过了这里。
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false"
2条答案
按热度按时间pbpqsu0x1#
查看以下网址:
https://debezium.io/documentation/reference/stable/configuration/avro.html#avro-serialization
在帖子备注中:
“如果希望使用JSON序列化记录,请考虑将以下连接器配置属性设置为false:
key.converter.schemas.enable
value.converter.schemas.enable
将这些属性设置为false将从每个记录中排除详细架构信息。”
“要使用Apache Avro序列化,必须部署管理Avro消息架构及其版本的架构注册表。可用选项包括Apicurio API和架构注册表以及Confluent架构注册表。此处对两者进行了描述。”
这就解释了这种行为。
a14dhokn2#
我的问题的解决方法是从头开始重新创建本地环境,其中不再出现模式注册表异常日志,没有其他与连接器或SMT配置相关的内容。