我的 STORE_TOPIC
是一个压缩主题,它包含实体的当前状态,基于从中消耗的事件 EVENTS_TOPIC
(Kafka流应用程序)。主要目的是 STORE_TOPIC
将作为(全局)ktable加载。
EVENTS_TOPIC | STORE_TOPIC
|
VALUE | KEY VALUE
{"entity":"A", "a":1, "b":2} | A {"a":1, "b":2}
{"entity":"B", "c":3} | B {"c":3}
{"entity":"A", "d":4} | A {"a":1, "b":2, "d":4}
{"entity":"A", "a":0, "e":5} | A {"a":0, "b":2, "d":4, "e":5}
当定义的属性子集发生更改时,新使用者需要得到通知。
例如,如果我们决定跟踪属性“a”、“b”和“c”的更改,则预期的输出将是:
A {"a":1, "b":2}
B {"c":3}
(Nothing)
A {"a":0, "b":2}
为此,我编写了一个新的Kafka流应用程序:
处理包含每个键所需属性的“本地存储”
荷载 STORE_TOPIC
像一条小溪
将传入的消息与“本地存储”的内容进行比较(感谢 .transform()
)
如果检测到更改,则将传入消息(仅需要跟踪的属性)写入新的 OUT_TOPIC
并在“本地存储”中添加/替换条目
有没有一个更简单,更优雅的方法,来实现这一点?
你能确认一下如果我装上 STORE_TOPIC
直接作为ktable: .table(STORE_TOPIC)
,我只能访问实体的当前状态,而无法访问实体的早期版本并对其进行比较?
暂无答案!
目前还没有任何答案,快来回答吧!