在kafka ktablechangelog主题中找不到更新的记录

4xy9mtcn  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(242)

我是新的Kafka流和最近使用streamdsl为我的项目。让我们举一个我想实现的例子。

before rollup
topic A:-

1(key) -> a(value),
2(key) -> a(value),
3(key) -> b(value),
4(key) -> b(value),

changelog table:-

1(key) -> a(value)
2(key) -> a(value)
3(key) -> b(value)
4(key) -> b(value)

sink topic:-
a(key) -> 1,2(value)
b(key) -> 3,4(value)

after rollup:-

topic A:-

1(key) -> a(value)
2(key) -> a(value)
3(key) -> b(value)
4(key) -> b(value)
2(key) -> b(value)

changelog table:-

1(key) -> a(value)
2(key) -> b(value)
3(key) -> b(value)
4(key) -> b(value)

sink topic:-
a(key) -> 1,2(value)
b(key) -> 3,4(value)
a(key) -> 1(value)
b(key) -> 2,3,4(value)

My approach:-

stream <- streamFrom["topicA"]
ktable < reduce(stream)((v,v) => v) // tried to krrp only latest value to a changelog table
groupedstr <- groupby(ktable.toStream)(key = value) // changed the key to value
aggregate  <- aggregate(groupedstr)(initializer = empty)(add) //aggaregated the grouped stream

the output of changelog table afer aggregate:
a(key) -> 1,2(value)
b(key) -> 3,4(value)
b(key) -> 2,3,4(value)

同样在ktable changelog中,我找不到更新的事件,它也包含以前的记录。请帮助我怎样才能做到这一点

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题