kafka接收器连接器中的avro反序列化问题

qij5mzcb  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(325)

我尝试使用kafka从db2读取数据,然后将其写入hdfs。我使用带有标准jdbc和hdfs连接器的分布式汇合平台。由于hdfs连接器需要知道模式,因此需要avro数据作为输入。因此,我必须为提供给kafka的数据指定以下avro转换器(在etc/kafka/connect distributed.properties中):

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

然后运行jdbc连接器,并与控制台avro consumer检查是否可以成功读取从db2获取的数据。
但是,当我启动hdfs连接器时,它就不再工作了。相反,它会输出serializationexception:

Error deserializing Avro message for id -1
... Unknown magic byte!

为了检查这是否是hdfs连接器的问题,我尝试使用一个简单的filesink连接器。但是,我在使用filesink时看到了完全相同的异常(文件本身被创建了,但仍然是空的)。
然后,我进行了以下实验:我没有使用avro转换器作为键和值,而是使用json转换器:

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schema.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schema.enable=false

这修复了filesink连接器的问题,即从db2到文件的整个管道工作正常。然而,对于hdfs连接器,这种解决方案是不可行的,因为连接器需要模式,因此需要avro格式作为输入。
我觉得接收器连接器中avro格式的反序列化没有正确实现,因为控制台avro使用者仍然可以成功读取数据。有人知道这种行为的原因吗?我也希望能有一个简单的解决办法!

fkaflof6

fkaflof61#

与控制台avro consumer确认我可以成功读取获取的数据
我猜你没有加上 --property print.key=true --from-beginning 当你这么做的时候。
最新的值可能是avro,但是connect显然在这个主题的某个地方失败了,所以您需要扫描它以找出发生这种情况的地方
如果使用 JsonConverter 工作正常,而且数据实际上是磁盘上可读的json,那么听起来像jdbc连接器实际上写的是json,而不是avro
如果能够确定错误消息的偏移量,则可以使用设置了连接器组id的常规控制台使用者,然后添加 --max-messages 以及指定用于跳过这些事件的分区和偏移量

相关问题