如果不同消息中的引用号(refNo)相同,我需要合并有效负载。我的限制是我只能使用KTable
,如果键是偶数,我不需要合并有效负载。另外,传入消息的顺序不应该改变结果。
例如,如果我们有一个空主题,传入的消息为:
1: { key: "1", value: {refNo:1, payload:{data1}} }
2: { key: "2", value: {refNo:1, payload:{data2}} }
3: { key: "3", value: {refNo:2, payload:{data3}} } // this one should be not effected and left how it is
预期结果:
1: { key: "1", value: {refNo:1, payload:{data1, data2}} }
2: { key: "2", value: {refNo:1, payload:{data2}} }
3: { key: "3", value: {refNo:2, payload:{data3}} }
我能想到的唯一方法是使用两次.groupBy
并再次加入到原始主题中。
1.首先将键更改为refNo
,将键保存到值本身,然后在聚合期间加入有效负载。
1.其次.groupBy
还原键到初始状态。
1.最后一步将所有内容都加入到原始主题中,因为我在分组时丢失了一条消息。
我很确定有一个更简单的方法来解决这个问题。什么是最优化和优雅的方法来解决这个问题?
编辑:其下游有输出题目,原文不编辑。
2条答案
按热度按时间vjrehmav1#
在KSQL中进行聚合可能正好可以实现这一点。
唯一的问题是你需要多大的窗口。你需要多久连接一次数据?
而且,感觉写回原来的主题就像一个大大的“不”。
1.您只是稍微更改了消息格式,其他下游使用者可能已经期望特定的消息格式。
1.写一个新的主题似乎是一个更好的途径,它还减少了额外的消费者需要消费的消息量。
gfttwv5a2#
目前我正在使用这个解决方案。它工作,但我不知道它将如何执行,或者它可以更优化,或者是否有更好的方法来解决我的问题。