spring云流才会处理kafka消息

6yoyoihd  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(279)

参照此解决方案,我的spring cloud stream application.yml文件具有以下配置:


# application.yml

spring.cloud.stream.bindings.input:
  destination: my-topic-name
  contentType: application/json
  consumer:
    useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.input:
  consumer:
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    materializedAs: my-store

在我用@enablebinding注解类的主应用程序和用@streamlistener注解的方法中,我使用kafka stream dsl和处理器api集成来访问状态存储,由于application.yml文件中存在物化数据,这些状态存储应该在应用程序启动时物化。

ReadOnlyKeyValueStore<Object, String> store;

 input.process(() -> new Processor<Object, Product>() {

                @Override
                public void init(ProcessorContext processorContext) {
                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");

                }

                @Override
                public void process(Object key, Object value) {
                    //find the key
                    store.get(key);
                }

                @Override
                public void close() {
                    if (state != null) {
                        state.close();
                    }
                }
            }, "my-store");

问题是,在应用程序第一次启动时,状态存储尚未完全填充并且尚未准备就绪(例如空状态存储),但是消息仍然提前到达并且正在由kafka流拓扑处理,从而产生意外的结果。
我们如何确保在第一次启动应用程序时,在@streamlistener中定义的任何流处理拓扑开始处理传入消息之前,已在application.yml文件中使用materializedas指示具体化的所有或特定(如果可能,用户可以定义)状态存储将被完全填充并准备使用。我们是否可以强制消息流处理等待状态存储在第一次启动应用程序时完全填充?
我试图通过修改一个spring云流示例来复制这个问题,并将修改后的版本推到这里。关于这方面的更详细的讨论也可以在这里找到

mftmpeh8

mftmpeh81#

似乎需要使用globalktable(状态存储)连接流,而不是使用进程接口。下面是kstream join globalktable。请试一试。

相关问题