如何在SpringCloudStream的kafka进程拓扑中使用交互式查询?

tp5buhyn  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(285)

是否可以在spring cloud stream类中使用带有@enablebinding注解的交互式查询(interactivequeryservice),或者在带有@streamlistener的方法中使用交互式查询?我尝试在提供的kstreammusicsampleapplication类和进程方法中示例化readonlykeyvaluestore,但始终为null。
我的@streamlistener方法正在监听一堆ktable和kstream,在过滤等过程中,我必须检查kstream中的密钥是否已经存在于特定的ktable中。
我试着找出如何扫描一个传入的ktable来检查一个密钥是否已经存在,但是没有运气。然后我遇到了interactivequeryservice,它的get()方法可以用来检查状态存储中是否存在一个键,它是从一个ktable实现的。问题是,我无法使用流程拓扑从中访问它(@enablebinding或@streamlistener)。它只能从这些注解外部访问,例如restcontroller。
有没有一种方法可以扫描一个传入的ktable来检查是否存在一个键或值?如果没有,那么我们可以访问流程拓扑中的interactivequeryservice吗?

xkrw2x1b

xkrw2x1b1#

InteractiveQueryService 在spring中,云数据流不可用于 StreamListener . 正如您提到的,它应该在主拓扑之外使用。但是,在您描述的用例中,您仍然可以使用主流中的状态存储。例如,如果您有一个传入的 KStream 和一个 KTable 具体化为状态存储,然后您可以调用 processKStream 这样就可以进入国家商店。下面是一个粗略的代码来实现这一点。您需要将其转换为适合您的特定用例,但这里有一个想法。

ReadOnlyKeyValueStore<Object, String> store;

 input.process(() -> new Processor<Object, Product>() {

                @Override
                public void init(ProcessorContext processorContext) {
                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");

                }

                @Override
                public void process(Object key, Object value) {
                    //find the key
                    store.get(key);
                }

                @Override
                public void close() {
                    if (state != null) {
                        state.close();
                    }
                }
            }, "my-store");

相关问题