我正在寻找关于如何在使用pulsar/kafka的解决方案中处理以下问题的建议。
场景是:生产者发送消息(json格式),消费者接收消息并将数据插入数据库表(在消息中指定)。
突然,生产者改变了消息中发送的数据的结构(比如说,因为数据库中的表结构有一个新的列)。
因此,队列有一段时间具有旧数据结构的消息,现在开始接收具有新数据结构的消息。
我的疑问是消费者应该如何处理这种情况。如何处理旧结构的消息,这些消息现在无效,因为它们无法插入数据库表中,因为表结构已更改。重试,然后永久失败(死信q?)。
另外,您通常选择将元数据与消息一起发送,还是通常以单独的主题或其他形式处理此问题。
谢谢你的建议
1条答案
按热度按时间wwodge7n1#
所描述的问题主要与外部系统有关,这将需要在生产者端进行某种类型的把门保持/预验证,生产者端知道如何使用数据来防止这种情况发生。不幸的是,这引入了紧耦合,因此如果没有紧耦合,您就必须显式地编写使用者代码,以便进行健壮的消息转换和异常处理,可能包括一种版本号或显式的模式,如confluent schema registry提供的每个消息(也可能是pulsar的模式注册表功能)