Kafka Connect定制SMT arvo序列化异常

r8uurelv  于 2023-02-28  发布在  Apache
关注(0)|答案(2)|浏览(170)

我正在写我自己的SMT,做一些特定领域的清洗。
代码结构简单明了,已成功编译并添加到plugin.path中。
创建具有以下配置的连接器时

{
    "name": "sql-to-kafka", 
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "database.hostname": "127.0.0.1", 
        "database.port": "3306", 
        "database.user": "username", 
        "database.password": "password", 
        "database.server.id": "11111", 
        "database.include.list": "test", 
        "schema.history.internal.kafka.bootstrap.servers": "localhost:9092", 
        "schema.history.internal.kafka.topic": "schemahistory.localdb", 
        "include.schema.changes": "false",
        "database.encrypt": false,
        "table.include.list": "test.bins",
        "topic.prefix":"localdb",
        "transforms":"unwrap,MyCustomSMT",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": false,
        "transforms.unwrap.delete.handling.mode": "drop",
        "transforms.MyCustomSMT.type": "MyCustomSMT$Value",
        "transforms.MyCustomSMT.field": "segment"
    }
}

我在连接器日志中收到以下异常

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:220)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:271)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic localdb.test.bins :
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
    at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)
    ... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name":"Key","namespace":"localdb.test.bins","fields":[{"name":"id","type":"int"}],"connect.name":"localdb.test.bins.Key"}
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
    at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
    ... 15 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Leader not known.; error code: 50004
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:544)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:532)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:490)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:257)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:366)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:337)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:115)
    ... 17 more

不知道为什么Avro序列化在这里抱怨,也值得一提的是,当我更新连接器配置和修改下面的配置,以使用JSONConverter的值和StringConverter的键,一切工作正常,不知道我错过了这里。

"key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable":"false"
pbpqsu0x

pbpqsu0x1#

查看以下网址:
https://debezium.io/documentation/reference/stable/configuration/avro.html#avro-serialization
在帖子备注中:
“如果希望使用JSON序列化记录,请考虑将以下连接器配置属性设置为false:
key.converter.schemas.enable
value.converter.schemas.enable
将这些属性设置为false将从每个记录中排除详细架构信息。”
“要使用Apache Avro序列化,必须部署管理Avro消息架构及其版本的架构注册表。可用选项包括Apicurio API和架构注册表以及Confluent架构注册表。此处对两者进行了描述。”
这就解释了这种行为。

a14dhokn

a14dhokn2#

我的问题的解决方法是从头开始重新创建本地环境,其中不再出现模式注册表异常日志,没有其他与连接器或SMT配置相关的内容。

相关问题