我想确认我对多个处理器读取一个Kafka流源的效率的理解。如果我想根据 predicate 逻辑执行两个不同的进程,我相信下面的示例1是最有效的。 predicate 查看值的内容(这里是通知对象)。如果在示例1中的以下每个处理器中都有一个断点,那么它将显示每个传入通知都会调用每个函数。而在示例2中,只有满足 predicate 逻辑时才调用process2函数。
例1
@Bean
public Function<KStream<String, Notification>,KStream<String, Notification>> process1() {
return input -> input
.branch(PREDICATE_FOR_OUT_0, PREDICATE_FOR_OUT_1);
}
@Bean
public Function<KStream<String, Notification>,KStream<String, EnrichedNotification>> process2() {
return input -> input
.filter(PREDICATE_FOR_OUT_2);
.map((key, value) ->.........; //different additional processing to map to EnrichedNotification type
}
不需要执行以下操作并尝试将一个处理器的输出路由到另一个处理器(不确定是否可能)
例2(概念性的)我可能是这样想的,因为我是从使用纯Kafka来的。这里process1有一个3路分支。其中两个分支分别转到各自的流,然后转到主题,但第三个分支需要进一步处理,然后才能路由到主题。
@Bean
public Function<KStream<String, Notification>,KStream<String, Notification>[]> process1() {
return input -> input
.branch(PREDICATE_FOR_OUT_0, PREDICATE_FOR_OUT_1, PREDICATE_FOR_OUT_2);
}
我们是否可以潜在地将 predicate \u for \u out \u 2的分支路由到process2。这意味着只有在满足 predicate \u for \u out \u 2时才会调用process2
@Bean
public Function<KStream<String, Notification>,KStream<String, EnrichedNotification>> process2() {
return input -> input
.map((key, value) ->.........; //different additional processing to map to EnrichedNotification type
}
我的想法是,由于kafka streams提供的抽象和功能,示例2是多余的(而且实际上无论如何都不可能)
1条答案
按热度按时间5f0d552i1#
我认为你举的两个例子都能完成任务,但有一些不同之处。在第一个示例中,有两个函数,都从同一个kafka主题接收数据,第二个函数在路由到输出主题之前执行一些附加逻辑。在第二个示例中,您还有两个函数。在第一个函数中,您有3个分支,每个分支将数据发送到Kafka主题(我假设它们是3个不同的主题)。然后在第二个函数中,从第一个函数的第三个输出主题接收数据。在执行示例2中的第二个函数中的逻辑之后,将其发送到该分支的最终目的地。您将为第二个示例引入一个额外的主题。我认为你的第一个例子更具可读性和清晰性。