我是Kafka Streams的新手,发现总体来说API很混乱,文档也不容易理解。我正在编写一个简单的流应用程序,如下所示:
1.输入流:key(String)-〉userID,value(String)-〉用户的事务记录JSON字符串。
1.拓扑:聚合上述输入并生成一个KTable〈String,UserAccountBalance〉,Key为userId,value为一个自定义对象,随着聚合的进行而更新。
final KStream<String, String> transactionsInput = streamsBuilder.stream("bank-balance-input-topic");
final KTable<String, UserBankBalance> table =
transactionsInput.groupBy((key, value) -> key)
.aggregate(() -> new UserBankBalance("dummyUserId", 0, "1866-12-23T17:47:37Z"),
(userName, transactionRecordStr, userBankBalance) -> {
// Code does the following:
// 1. Deserialize the transactionRecordStr
// 2. Update the UserBankBalance object.
// return userBankBalance;
});
(默认键、值serdes配置为String)然而,在运行一些健全性测试时,我发现String序列化器与UserBankBalance对象不兼容。
- 为什么mapValues、groupByKey、aggregate等操作需要Serdes?
我的理解是:
- 流库需要使用默认的serdes来具体化对象以更新内部状态?
- 如果发生了重新分区,键、值需要序列化并存储回内部分区以供进一步处理?
- 鉴于上述情况,即使我们只是将
KTable<String,UserBankBalance>
创建为内存中的表示,仍然需要Serdes。
我已经阅读了官方文件和API文件,只是找不到任何好的澄清。
- 为什么Kafka Stream的库不提供一个默认的
ObjectMapperSerdes
,它利用了Jackson的ObjectMapper
(就像这个官方例子)?我想很多用户会有类似的用例,库用户这样做会有重复的努力。
参考文献:
1条答案
按热度按时间jvlzgdj91#
为什么操作需要Serdes
Kafka存储字节。Streams API不将对象从一个操作传递到另一个操作,它使用Kafka作为消息总线。它需要将任何对象序列化为字节以通过网络发送。
如果您正在使用JSON,那么Kafka已经有一种内置的方法来创建JSONSerde;因为可以使用
Serdes.serdeFrom
静态方法,所以不需要ObjectMapper
类路径(同样,它会创建对connect-json
模块的依赖关系,并扩大kafka-streams
类路径)。或者,Spring-Kafka也有
JsonSerde
,Confluent维护AvroSerde
、ProtobufSerde
等,以便与这些工具生成的类一起使用。获取的字符串序列化程序与UserBankBalance对象不兼容
您需要对每个操作使用
Grouped
,Materialized
,Consumed
, orProduced
类的某种组合来覆盖默认的拓扑serde。