我正在尝试配置Confluent S3 Sink Connector以使用来自MSK集群中的主题的消息,并将其写入S3上的Parquet文件中。它需要一个Schema,如Confluent Schema,但需要Confluent Platform和Confluent CLI,因此我看到可以使用Glue Schema Registry:https://aws.amazon.com/blogs/big-data/build-an-end-to-end-change-data-capture-with-amazon-msk-connect-and-aws-glue-schema-registry/
问题是我总是在日志中得到以下异常:
Error converting message value in topic 'events' partition 0 at offset 2012214 and timestamp 1697576912985: Converting byte[] to Kafka Connect data failed due to serialization error
字符串
我在AWS中创建了一个Schema Registry,testregistry 1,然后尝试配置:
"connector.class" = "io.confluent.connect.s3.S3SinkConnector"
"key.converter" = "org.apache.kafka.connect.storage.StringConverter"
"key.converter.schemas.enable" = "true"
"key.converter.avroRecordType" = "GENERIC_RECORD"
"key.converter.region" = "eu-central-1"
"key.converter.registry.name" = "testregistry1"
"key.converter.schemaAutoRegistrationEnabled" = "true"
"value.converter" = "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
"value.converter.schemas.enable" = "true"
"value.converter.avroRecordType" = "GENERIC_RECORD"
"value.converter.region" = "eu-central-1"
"value.converter.registry.name" = "testregistry1"
"value.converter.schemaAutoRegistrationEnabled" = "true"
# S3
"topics" = "events"
"flush.size" = 10
"s3.region" = eu-central-1
"s3.bucket.name" = "my_bucket"
"s3.part.size" = 26214400
"storage.class" = "io.confluent.connect.s3.storage.S3Storage"
"format.class" = "io.confluent.connect.s3.format.parquet.ParquetFormat"
"partitioner.class" = "io.confluent.connect.storage.partitioner.DefaultPartitioner"
"schema.compatibility" = "NONE"
# Authorization
"security.protocol" = "SASL_SSL"
"sasl.mechanism" = "AWS_MSK_IAM"
"sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"confluent.topic.consumer.security.protocol" = "SASL_SSL"
"confluent.topic.consumer.sasl.mechanism" = "AWS_MSK_IAM"
"confluent.topic.consumer.sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"confluent.topic.consumer.sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"confluent.topic.producer.security.protocol" = "SASL_SSL"
"confluent.topic.producer.sasl.mechanism" = "AWS_MSK_IAM"
"confluent.topic.producer.sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"confluent.topic.producer.sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
型
我还尝试将AWSKafkaAvroConnector设置为key.converter,但我得到了相同的错误。
完整日志:
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] [2023-10-18 10:55:24,825] ERROR Executing stage 'VALUE_CONVERTER' with class 'com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter', where consumed record is {topic='events', partition=6, offset=2316078, timestamp=1697626522632, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:118)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.lang.Thread.run(Thread.java:829)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.SecondaryDeserializer.deserialize(SecondaryDeserializer.java:65)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:150)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:116)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] ... 18 more
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] [2023-10-18 10:55:24,825] ERROR Error converting message value in topic 'events' partition 6 at offset 2316079 and timestamp 1697626522815: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask:547)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:118)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.lang.Thread.run(Thread.java:829)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.SecondaryDeserializer.deserialize(SecondaryDeserializer.java:65)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:150)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:116)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] ... 18 more
型
更新:
我已经在一个EC2示例中将Glue Schema Registry替换为Confluent Registry。https://docs.confluent.io/platform/current/schema-registry/develop/api.html#example-requests-format-and-valid-json
curl -X POST http://localhost:8081/subjects/test/versions -H 'Content-Type: application/json' -H 'Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json' -d '{"schema":"{\"type\": \"record\",\"name\": \"test\",\"fields\":[{\"type\": \"string\",\"name\": \"field1\"},{\"type\": \"int\",\"name\": \"field2\"}]}"}'
型
它被创建,但在我的连接器中,在尝试设置以下属性之后:
"key.converter" = "org.apache.kafka.connect.storage.StringConverter"
"value.converter" = "io.confluent.connect.json.JsonSchemaConverter"
"value.converter.schemas.enable" = false
"value.converter.avroRecordType" = "GENERIC_RECORD"
"value.converter.dataFormat" = "JSON"
"value.converter.schema.registry.url" = "http://<ip>:8081"
型
我仍然得到Converting byte[] to Kafka错误,但这次是由于magic byte:
2023-10-25T16:39:35.000+02:00 [Worker-00b8ad4cf64746b30] Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
2023-10-25T16:39:35.000+02:00 [Worker-00b8ad4cf64746b30] Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
型
我试图发送的消息很简单,使用./kafka-console-producer.sh,我发送到我的主题:
{"field1":"test4578_01","field2":1}
型
1条答案
按热度按时间wrrgggsh1#
我想传递的信息很简单,用./kafka-console-producer.sh
这不是JSONSchema,它只是普通的JSON,所以不与任何注册表关联的Converter类交互,因此您会看到“无效的Magic Byte”,因为数据没有正确序列化。
ParquetFormat
需要一个带有模式的结构化事件(具体来说,它使用Avro来构建带有parquet-avro
Java包的Parquet文件)。https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
我个人没有测试过JSON->Parquet,只有Avro->JSON和Avro->Parquet,但你可以在上面的博客中看到JSON事件需要看起来像
{"schema": ..., "payload": ...}
,带有org.apache.kafka...JsonConverter
+schemas.enable=true
属性(默认值),否则你需要使用来自Confluent Platform的kafka-json-schema-console-producer
CLI命令,带有io.confluent...Json**Schema**Converter
+schema.registry.url