我有一个最新版本的Spring Cloud Stream应用程序,我使用的是函数式的方法。要求是接收消息,避免重复的消息(因为生产者可能发布重复的消息),然后转换消息,最后将计数放入状态存储中。为了满足避免重复消息的需求,我正在考虑使用中间状态存储,但我不知道如何将状态存储与当前KStream链接。
下面是示例代码:
@Bean
public Function<KStream<String, String>, KStream<String, Long>> sampleProcessor() {
return source -> source
.mapValues(value -> value.toUpperCase())
.groupBy((key, value) -> value)
.count(Materialized.as("count-store"))
.toStream();
}
例如:如果我发布以下消息:
m1 = Hello
m2 = World
m3 = Hello
然后,只需要消耗m1和m2。
1条答案
按热度按时间bq3bfh9z1#
我可以通过创建一个StoreBuilder bean来实现:
然后使用KStream的这个store in transform方法如下: