org.apache.Kafka.connect.errors.DataException:由于序列化错误,将byte[]转换为Kafka Connect数据失败

zzoitvuj  于 2023-11-16  发布在  Apache
关注(0)|答案(1)|浏览(176)

我正在尝试配置Confluent S3 Sink Connector以使用来自MSK集群中的主题的消息,并将其写入S3上的Parquet文件中。它需要一个Schema,如Confluent Schema,但需要Confluent Platform和Confluent CLI,因此我看到可以使用Glue Schema Registry:https://aws.amazon.com/blogs/big-data/build-an-end-to-end-change-data-capture-with-amazon-msk-connect-and-aws-glue-schema-registry/
问题是我总是在日志中得到以下异常:

Error converting message value in topic 'events' partition 0 at offset 2012214 and timestamp 1697576912985: Converting byte[] to Kafka Connect data failed due to serialization error

字符串
我在AWS中创建了一个Schema Registry,testregistry 1,然后尝试配置:

"connector.class" = "io.confluent.connect.s3.S3SinkConnector"
"key.converter"                                 = "org.apache.kafka.connect.storage.StringConverter"
"key.converter.schemas.enable"                  = "true"
"key.converter.avroRecordType"                  = "GENERIC_RECORD"
"key.converter.region"                          = "eu-central-1"
"key.converter.registry.name"                   = "testregistry1"
"key.converter.schemaAutoRegistrationEnabled"   = "true"
"value.converter"                               = "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
"value.converter.schemas.enable"                = "true"
"value.converter.avroRecordType"                = "GENERIC_RECORD"
"value.converter.region"                        = "eu-central-1"
"value.converter.registry.name"                 = "testregistry1"
"value.converter.schemaAutoRegistrationEnabled" = "true"    

# S3
"topics"               = "events"
"flush.size"           = 10
"s3.region"            = eu-central-1
"s3.bucket.name"       = "my_bucket"
"s3.part.size"         = 26214400
"storage.class"        = "io.confluent.connect.s3.storage.S3Storage"
"format.class"         = "io.confluent.connect.s3.format.parquet.ParquetFormat"
"partitioner.class"    = "io.confluent.connect.storage.partitioner.DefaultPartitioner"
"schema.compatibility" = "NONE"

# Authorization
"security.protocol"                                           = "SASL_SSL"
"sasl.mechanism"                                              = "AWS_MSK_IAM"
"sasl.jaas.config"                                            = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"sasl.client.callback.handler.class"                          = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"confluent.topic.consumer.security.protocol"                  = "SASL_SSL"
"confluent.topic.consumer.sasl.mechanism"                     = "AWS_MSK_IAM"
"confluent.topic.consumer.sasl.jaas.config"                   = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"confluent.topic.consumer.sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"confluent.topic.producer.security.protocol"                  = "SASL_SSL"
"confluent.topic.producer.sasl.mechanism"                     = "AWS_MSK_IAM"
"confluent.topic.producer.sasl.jaas.config"                   = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"confluent.topic.producer.sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"


我还尝试将AWSKafkaAvroConnector设置为key.converter,但我得到了相同的错误。
完整日志:

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] [2023-10-18 10:55:24,825] ERROR Executing stage 'VALUE_CONVERTER' with class 'com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter', where consumed record is {topic='events', partition=6, offset=2316078, timestamp=1697626522632, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter:66)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:118)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.lang.Thread.run(Thread.java:829)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.SecondaryDeserializer.deserialize(SecondaryDeserializer.java:65)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:150)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:116)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] ... 18 more

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] [2023-10-18 10:55:24,825] ERROR Error converting message value in topic 'events' partition 6 at offset 2316079 and timestamp 1697626522815: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask:547)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:118)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.lang.Thread.run(Thread.java:829)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.SecondaryDeserializer.deserialize(SecondaryDeserializer.java:65)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:150)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:116)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] ... 18 more


更新:
我已经在一个EC2示例中将Glue Schema Registry替换为Confluent Registry。https://docs.confluent.io/platform/current/schema-registry/develop/api.html#example-requests-format-and-valid-json

curl -X POST http://localhost:8081/subjects/test/versions -H 'Content-Type: application/json' -H 'Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json' -d '{"schema":"{\"type\": \"record\",\"name\": \"test\",\"fields\":[{\"type\": \"string\",\"name\": \"field1\"},{\"type\": \"int\",\"name\": \"field2\"}]}"}'


它被创建,但在我的连接器中,在尝试设置以下属性之后:

"key.converter"                       = "org.apache.kafka.connect.storage.StringConverter"
"value.converter"                     = "io.confluent.connect.json.JsonSchemaConverter"
"value.converter.schemas.enable"      = false
"value.converter.avroRecordType"      = "GENERIC_RECORD"
"value.converter.dataFormat"          = "JSON"
"value.converter.schema.registry.url" = "http://<ip>:8081"


我仍然得到Converting byte[] to Kafka错误,但这次是由于magic byte:

2023-10-25T16:39:35.000+02:00   [Worker-00b8ad4cf64746b30] Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1

2023-10-25T16:39:35.000+02:00   [Worker-00b8ad4cf64746b30] Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!


我试图发送的消息很简单,使用./kafka-console-producer.sh,我发送到我的主题:

{"field1":"test4578_01","field2":1}

wrrgggsh

wrrgggsh1#

我想传递的信息很简单,用./kafka-console-producer.sh
这不是JSONSchema,它只是普通的JSON,所以不与任何注册表关联的Converter类交互,因此您会看到“无效的Magic Byte”,因为数据没有正确序列化。
ParquetFormat需要一个带有模式的结构化事件(具体来说,它使用Avro来构建带有parquet-avro Java包的Parquet文件)。
https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
我个人没有测试过JSON->Parquet,只有Avro->JSON和Avro->Parquet,但你可以在上面的博客中看到JSON事件需要看起来像{"schema": ..., "payload": ...},带有org.apache.kafka...JsonConverter + schemas.enable=true属性(默认值),否则你需要使用来自Confluent Platform的kafka-json-schema-console-producer CLI命令,带有io.confluent...Json**Schema**Converter + schema.registry.url

相关问题