在我的事件驱动项目中,我有类型为 Commands
作为回应,我有 Events
.
这些 Commands
以及 Events
消息表示域,因此它们包含域中的复杂类型。
例子:
RegisterClientCommand(Name, Email)
ClientRegisteredEvent(ClientId)
域中还有几十个这样的命令和事件对。
我在想这样的事情:
RawMessage(payloadMap, sequenceId, createdOn)
有效负载将包含消息域类类型名称和消息字段。
我也读过关于avro格式的文章,但是定义每条消息的消息格式似乎要做很多工作。
在通过Kafka经纪人实际传输的消息格式方面,最佳实践是什么?
1条答案
按热度按时间7nbnzgx91#
没有单一的“最佳”方法可以做到这一点,它将完全取决于您的团队/组织的专业知识,以及您的项目的具体要求。
Kafka本身对信息的实际内容漠不关心。大多数时候,它只是将消息值和键看作不透明的字节数组。
不管你最终定义了什么
RawMessage
在java方面,它必须被序列化为字节数组才能生成kafka,因为这就是为什么KafkaProducer
要求。也许它是您已经拥有的自定义字符串序列化程序,也许您可以使用jackson或类似的工具将pojo序列化为json。或者您只需发送一个巨大的逗号分隔字符串作为消息。完全由你决定。重要的是,当消费者从kafka主题中提取消息时,他们能够正确可靠地读取消息中每个字段的数据,而不会出现任何错误、版本冲突等。现有的大多数serde/schema机制(如avro、protobuf或thrift)都会使这项工作更容易。特别复杂的事情,比如确保新消息与同一消息的先前版本向后兼容。
大多数人最终会有以下几种情况:
serde机制用于创建字节数组以生成kafka,一些流行的机制有avro、protobuf、thrift。
原始json字符串
一个巨大的字符串,具有某种内部/自定义格式,可以进行解析/分析。
一些公司使用集中式模式服务。这样,您的数据使用者就不必提前知道消息包含什么模式,他们只需下拉消息,并从服务请求相应的模式。confluent有自己的自定义模式注册解决方案,多年来一直支持avro,几周前,它正式支持protobuf。这不是必需的,如果您拥有端到端的生产者/消费者,您可能会决定自己处理序列化,但是很多人已经习惯了。
根据消息类型的不同,有时您需要压缩,因为消息可能非常重复和/或很大,因此如果您发送压缩消息,最终会节省相当多的存储和带宽,代价是一些cpu使用率和延迟。这也可以由您自己在生产者/消费者端处理,在字节数组序列化之后压缩它们,或者您可以直接在生产者端请求消息压缩(查找
compression.type
在Kafka文件中)。