我有一个kafka流应用程序,使用spring云流kafka流框架。
应用程序具有:
Bean-1 (defined as a Function<KStream<String,InputEventType>,KStream<String,OutputEventType>> function1() )
Bean-2 (defined as a Function<KStream<String,InputEventType>,KStream<String,OutputEventType>> function2() )
这些函数基于inputeventtype的特定字段值处理消息。
在应用程序yaml中,我使用以下属性在绑定级别为每个函数分配了applicationid:
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
上述两个函数的输入主题是相同的,即来自同一输入主题的两个流。
两者都使用transformvalues api来访问statestore并对其执行puts/get。
如果上面的两个bean使用相同的状态存储(至少是名称),它们是否共享相同的数据,因为它们只是在同一个应用程序示例中运行的处理器?
我猜不是因为它们都被配置为具有不同的绑定级别applicationid
但是如果我分配一个绑定器级别的applicationid,那么只有一个处理器获得消息。我猜这是因为kafka流引擎会将该applicationid的消费偏移量视为两个处理器的公共偏移量。
暂无答案!
目前还没有任何答案,快来回答吧!