我是新的Kafka流和最近使用streamdsl为我的项目。让我们举一个我想实现的例子。
before rollup
topic A:-
1(key) -> a(value),
2(key) -> a(value),
3(key) -> b(value),
4(key) -> b(value),
changelog table:-
1(key) -> a(value)
2(key) -> a(value)
3(key) -> b(value)
4(key) -> b(value)
sink topic:-
a(key) -> 1,2(value)
b(key) -> 3,4(value)
after rollup:-
topic A:-
1(key) -> a(value)
2(key) -> a(value)
3(key) -> b(value)
4(key) -> b(value)
2(key) -> b(value)
changelog table:-
1(key) -> a(value)
2(key) -> b(value)
3(key) -> b(value)
4(key) -> b(value)
sink topic:-
a(key) -> 1,2(value)
b(key) -> 3,4(value)
a(key) -> 1(value)
b(key) -> 2,3,4(value)
My approach:-
stream <- streamFrom["topicA"]
ktable < reduce(stream)((v,v) => v) // tried to krrp only latest value to a changelog table
groupedstr <- groupby(ktable.toStream)(key = value) // changed the key to value
aggregate <- aggregate(groupedstr)(initializer = empty)(add) //aggaregated the grouped stream
the output of changelog table afer aggregate:
a(key) -> 1,2(value)
b(key) -> 3,4(value)
b(key) -> 2,3,4(value)
同样在ktable changelog中,我找不到更新的事件,它也包含以前的记录。请帮助我怎样才能做到这一点
暂无答案!
目前还没有任何答案,快来回答吧!