是否可以在spring cloud stream类中使用带有@enablebinding注解的交互式查询(interactivequeryservice),或者在带有@streamlistener的方法中使用交互式查询?我尝试在提供的kstreammusicsampleapplication类和进程方法中示例化readonlykeyvaluestore,但始终为null。
我的@streamlistener方法正在监听一堆ktable和kstream,在过滤等过程中,我必须检查kstream中的密钥是否已经存在于特定的ktable中。
我试着找出如何扫描一个传入的ktable来检查一个密钥是否已经存在,但是没有运气。然后我遇到了interactivequeryservice,它的get()方法可以用来检查状态存储中是否存在一个键,它是从一个ktable实现的。问题是,我无法使用流程拓扑从中访问它(@enablebinding或@streamlistener)。它只能从这些注解外部访问,例如restcontroller。
有没有一种方法可以扫描一个传入的ktable来检查是否存在一个键或值?如果没有,那么我们可以访问流程拓扑中的interactivequeryservice吗?
1条答案
按热度按时间xkrw2x1b1#
InteractiveQueryService
在spring中,云数据流不可用于StreamListener
. 正如您提到的,它应该在主拓扑之外使用。但是,在您描述的用例中,您仍然可以使用主流中的状态存储。例如,如果您有一个传入的KStream
和一个KTable
具体化为状态存储,然后您可以调用process
上KStream
这样就可以进入国家商店。下面是一个粗略的代码来实现这一点。您需要将其转换为适合您的特定用例,但这里有一个想法。