在kafka streams应用程序中,我们有一个ktable(不是kstream),我们希望在其中计算一些统计信息,例如满足某个属性的所有“rows”/项的最小值或最大值。因此,我们通过将grouping属性设置为key,将ktable转换为kgroupedtable。使用这个分组表,现在可以简单地计算count或sum之类的值。我们只需要使用 aggregate
具有特定适当加法器和减法器函数的方法。(+/-1表示计数,+/-值表示总和)。
然而,对于像min/max这样的聚合,没有这样简单的减法函数。实现最小/最大聚合的一个解决方案是将值聚合到类似于map的对象上,其中加法器函数向map中添加值,而减法则从map中删除值。在连续的一步中,我们可以 map
只需在条目上迭代,就可以Map到最小/最大值。
// Example without types and serdes
KTable sums = myKTable.groupBy((k, v) -> KeyValue.pair(v.getProperty(), v)
.aggregate(() -> 0, (k, v, a) -> a + v.getValue(), , (k, v, a) -> a - v.getValue();
KTable mins = myKTable.groupBy((k, v) -> KeyValue.pair(v.getProperty(), v)
.aggregate(() -> Map.of(), (k, v, a) -> a.put(v.getId(), v.getValue()), , (k, v, a) -> a.remove(v.getId())
.mapValues((k, v) -> StatsHelper.min(v));
然而,这并不是很顺利,因为我们将不得不序列化整个Map及其所有条目的整个时间。有没有更好的方法来实现这一点,即直接通过框架获取所有相关的“行”?或者这根本不可行?
暂无答案!
目前还没有任何答案,快来回答吧!