如何在Spring Cloud Stream应用程序中使用自定义Kafka状态存储

ajsxfq5m  于 2023-04-30  发布在  Spring
关注(0)|答案(1)|浏览(112)

我有一个最新版本的Spring Cloud Stream应用程序,我使用的是函数式的方法。要求是接收消息,避免重复的消息(因为生产者可能发布重复的消息),然后转换消息,最后将计数放入状态存储中。为了满足避免重复消息的需求,我正在考虑使用中间状态存储,但我不知道如何将状态存储与当前KStream链接。
下面是示例代码:

@Bean
public Function<KStream<String, String>, KStream<String, Long>> sampleProcessor() {
    return source -> source
            .mapValues(value -> value.toUpperCase())
            .groupBy((key, value) -> value)
            .count(Materialized.as("count-store"))
            .toStream();
}

例如:如果我发布以下消息:

m1 = Hello
m2 = World
m3 = Hello

然后,只需要消耗m1和m2。

bq3bfh9z

bq3bfh9z1#

我可以通过创建一个StoreBuilder bean来实现:

@Bean
public StoreBuilder<KeyValueStore<String, String>> storeBuilder() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("value-store"),
            Serdes.String(),
            Serdes.String()
    );
}

然后使用KStream的这个store in transform方法如下:

@Bean
public Function<KStream<String, String>, KStream<String, Long>> sampleProcessor() {
    return source -> source
            .transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
                private KeyValueStore<String, String> valueStore;

                @Override
                public void init(ProcessorContext context) {
                    valueStore = context.getStateStore("value-store");
                }

                @Override
                public KeyValue<String, String> transform(String key, String value) {
                    if (isNull(valueStore.get(value))) {
                        valueStore.put(value, "CONST");
                        return KeyValue.pair(key, value);
                    }
                    return null;
                }

                @Override
                public void close() {
                }
            }, "value-store")
            .mapValues(value -> value.toUpperCase())
            .groupBy((key, value) -> value)
            .count(Materialized.as("count-store"))
            .toStream();
}

相关问题