如何跟踪主题中两个事件之间的变化?

n7taea2i  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(174)

我的 STORE_TOPIC 是一个压缩主题,它包含实体的当前状态,基于从中消耗的事件 EVENTS_TOPIC (Kafka流应用程序)。主要目的是 STORE_TOPIC 将作为(全局)ktable加载。

EVENTS_TOPIC                    |    STORE_TOPIC
                                |    
VALUE                           |     KEY      VALUE
{"entity":"A", "a":1, "b":2}    |     A        {"a":1, "b":2}
{"entity":"B", "c":3}           |     B        {"c":3}
{"entity":"A", "d":4}           |     A        {"a":1, "b":2, "d":4}
{"entity":"A", "a":0, "e":5}    |     A        {"a":0, "b":2, "d":4, "e":5}

当定义的属性子集发生更改时,新使用者需要得到通知。
例如,如果我们决定跟踪属性“a”、“b”和“c”的更改,则预期的输出将是:

A        {"a":1, "b":2}
B        {"c":3}
(Nothing)
A        {"a":0, "b":2}

为此,我编写了一个新的Kafka流应用程序:
处理包含每个键所需属性的“本地存储”
荷载 STORE_TOPIC 像一条小溪
将传入的消息与“本地存储”的内容进行比较(感谢 .transform() )
如果检测到更改,则将传入消息(仅需要跟踪的属性)写入新的 OUT_TOPIC 并在“本地存储”中添加/替换条目
有没有一个更简单,更优雅的方法,来实现这一点?
你能确认一下如果我装上 STORE_TOPIC 直接作为ktable: .table(STORE_TOPIC) ,我只能访问实体的当前状态,而无法访问实体的早期版本并对其进行比较?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题