如何使用SpringCloudKafka流绑定器从单个输入流创建多个输出流?

dxxyhpgq  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(368)

我正在尝试从单个输入流创建多个输出流(取决于不同的时间窗口)。

interface AnalyticsBinding {
        String PAGE_VIEWS_IN = "pvin";
        String PAGE_VIEWS _COUNTS_OUT_Last_5_Minutes = "pvcout_last_5_minutes";
        String PAGE_VIEWS _COUNTS_OUT_Last_30_Minutes = "pvcout_last_30_minutes";
        @Input(PAGE_VIEWS_IN)
        KStream<String, PageViewEvent> pageViewsIn();
        @Output(PAGE_VIEWS_COUNTS_OUT_Last_5_Minutes)
        KStream<String,Long> pageViewsCountOutLast5Minutes();
        @Output(PAGE_VIEWS_COUNTS_OUT_Last_30_Minutes)
        KStream<String,Long> pageViewsCountOutLast30Minutes();
    }

  @StreamListener
  @SendTo({ AnalyticsBinding.PAGE_VIEWS_COUNTS_OUT_Last_5_Minutes })
    public KStream<String, Long> processPageViewEventForLast5Mintues(
            @Input(AnalyticsBinding.PAGE_VIEWS_IN)KStream<String, PageViewEvent> stream) {
                  // aggregate by Duration.ofMinutes(5)
    }

  @StreamListener
  @SendTo({ AnalyticsBinding.PAGE_VIEWS_COUNTS_OUT_Last_30_Minutes })
    public KStream<String, Long> processPageViewEventForLast30Mintues(
            @Input(AnalyticsBinding.PAGE_VIEWS_IN)KStream<String, PageViewEvent> stream) {
                  // aggregate by Duration.ofMinutes(30)
}

当我启动应用程序时,只有一个流任务可以工作,有没有办法让ProcessPageViewEventforLast5Minutes和ProcessPageViewEventforLast30minutes同时工作

zynd9foi

zynd9foi1#

您在两个处理器中使用相同的输入绑定,这就是为什么您只看到一个在工作。在绑定接口中添加另一个输入绑定,并将其目标设置为同一主题。另外,换一个 StreamListener 方法来使用此新绑定名称。
也就是说,如果您使用的是最新版本的springcloudstream,那么您应该考虑迁移到功能模型。例如,以下操作应起作用。

@Bean
public Function<KStream<String, PageViewEvent>, KStream<String, Long>> processPageViewEventForLast5Mintues() {
...
}

@Bean
public Function<KStream<String, PageViewEvent>, KStream<String, Long>> processPageViewEventForLast30Mintues() {
...
}

在这种情况下,绑定器会自动创建两个不同的输入绑定。您可以在这些绑定上设置目的地。

spring.cloud.stream.bindings.processPageViewEventForLast5Mintues-in-0.destination=<your Kafka topic>
spring.cloud.stream.bindings.processPageViewEventForLast30Mintues-in-0.destination=<your Kafka topic>

相关问题