当我们通过ksql创建avro消息并尝试使用kafkaconnect来使用它们时,会发生一些奇怪的事情。一点背景:
源数据第三方提供商正在我们的kafka集群上以json格式生成数据(到目前为止,还不错)。我们实际上看到了数据。
数据转换由于我们的内部系统要求数据在avro中编码,我们创建了一个ksql集群,通过在ksql中创建以下流将传入数据转换为avro:
{
"ksql": "
CREATE STREAM src_stream (browser_name VARCHAR)
WITH (KAFKA_TOPIC='json_topic', VALUE_FORMAT='JSON');
CREATE STREAM sink_stream WITH (KAFKA_TOPIC='avro_topic',VALUE_FORMAT='AVRO', PARTITIONS=1, REPLICAS=3) AS
SELECT * FROM src_stream;
",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
(到目前为止,还不错)
随着偏移量的增加,我们看到数据从json主题生成到avro主题。
然后在(新的)kafka connect集群中创建一个kafka连接器。在某些上下文中,我们使用多个kafka connect集群(这些集群具有相同的属性),因此,我们有一个kafka connect集群运行此数据,但有一个集群的精确副本用于其他avro数据(1个用于分析,1个用于我们的业务数据)。
此连接器的接收器是bigquery,我们使用的是wepay bigquery接收器连接器1.2.0。同样,到目前为止,一切都很好。我们的业务集群使用这个连接器运行良好,业务集群上的avro主题正在流入bigquery。
但是,当我们尝试使用前面由ksql语句创建的avro主题时,我们看到抛出了一个异常:/
例外情况如下:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: dpt_video_event-created_v2
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:98)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:415)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:408)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:123)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:190)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:169)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:243)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:134)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
对我们来说,这表示kafka connect正在读取消息,解码avro并尝试从schema注册表中获取id为0的schema。显然,schema注册表中的schema id总是>0。
我们目前正努力找出问题所在。看起来ksql正在对架构id为0的消息进行编码,但找不到原因:/
感谢您的帮助!
比尔,帕特里克
更新:我们已经为avro消息实现了一个基本的使用者,并且该使用者正确地标识了avro消息(id:3)中的模式,因此它似乎被改为kafka connect,而不是实际的ksql/avro消息。
1条答案
按热度按时间b4qexyjb1#
显然,模式注册表中的模式ID总是>0。。。看起来ksql正在对架构id为0的消息进行编码,但找不到原因
avroconverter执行一个“哑检查”,只检查消耗的字节是否以
0x0
. 接下来的4个字节是id。如果您正在使用
key.converter=AvroConverter
你的钥匙一开始就像0x00000
如果使用十六进制,那么在日志中id将显示为0,并且查找将失败。上次我检查时,ksql没有以avro格式输出键,所以您需要检查连接器的属性。