java—使用特定的id将架构添加到架构注册表

snvhrwxg  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(312)

我们已经在kafkastreams中使用confluent schema registry一年多了,一切都运行得很好;直到昨天。
在uat环境中,我们似乎删除了一个模式主题,并且我们的一个应用程序开始使用消息进行故障转移
[错误]logandfailexceptionhandler-反序列化过程中捕获异常,taskid:0\u 13,主题:主题名称,分区:13,偏移量:0 org.apache.kafka.common.errors.serializationexception:检索id 1531的avro架构时出错
我检查了schema注册表,发现主题丢失,并用curl查询错误中列出的id 1531,例如:

curl -X GET http://SchemaRegistryHost:8081/schemas/ids/1531

然后回来了:

{"error_code":40403,"message":"Schema not found"}

我天真地只是尝试再次注册模式而没有考虑它,它工作了,但该模式注册的id与以前的1531 id不同。
我需要注册到ID1531的模式,因为主题中现有的消息已经在魔法字节中包含了ID1531。
我查了api文档https://docs.confluent.io/current/schema-registry/docs/develop/api.html 但没有看到为模式设置给定id的任何内容。
是否有任何方法可以使用schema registry将schema强制为特定的id?
我知道一些备份解决方案,但我现在正在寻找一个修复,希望能防止数据丢失或非常措施,以修复主题数据。

3qpi33ja

3qpi33ja1#

是否有任何方法可以使用schema registry将schema强制为特定的id?
没有。
1531的id实际上并没有“消失”,顺便说一下,它只是在注册表中被标记为已删除(使用 _schemas 主题)。
据我所知,当你使用Kafka夫罗德塞利泽时,真的没有办法避免这个错误。您必须使用bytearraydeserializer,然后使用schema registry客户端“修复”或“查找”正确的id,然后反序列化消息的其余部分。
另一个选项是重置用户组,以便完全跳过这些消息,或者设置异常处理。使用kafka的streams api处理错误消息

相关问题