我有一个kafka streams应用程序,它对传入状态进行操作,需要在写入下一个主题之前存储状态。只有在本地存储中更新状态后,才应进行写入。
像这样的。
stream.map(this::getAndUpdateState)
.map(this::processStateAndEvent)
.to("topicname");
所以在 getAndUpdateState()
我可以做喜欢的事
state = store.get(key); // or new if null
state = updateState(state, event); // update changes to state
store.put(key, state); // write back the state
return state;
如何在kafka存储上实现简单的get()和put()操作?我已经尝试过使用keyvaluestore,但它有问题,因为我必须添加一个源和接收器处理器等等。
或者,使用ktable或其他一些概念来获取和放置Kafka也可以。
2条答案
按热度按时间oiopk7p51#
感谢用户152468和matthias j.的建议。萨克斯。
我能够在kafka流中使用
transform()
方法。下面给出了基于原始管道示例的完整工作代码。管道.java:
bq3bfh9z2#
听起来你想做批处理。kafka streams是一个流处理库,所有处理器并行/并发运行,构建一个数据管道。
我想你还是可以用的
transform()
附加状态,不向下游发出任何信息,只将数据放入存储区。您可以安排一个挂钟时间标点来扫描整个商店,并向下游发出商店中的所有数据。然而,总的来说,这似乎是一种反模式。思考最难理解的部分是,什么时候状态是“完全加载”的——因为一个主题在定义上/概念上是无限的,加载状态“永远”不会结束。