试图理解过滤器不起作用

5sxhfpxr  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(199)

我有点困惑为什么下面的过滤器不工作。我将输入主题中的流值作为“start”。你知道为什么这个简单的方法不起作用吗?我正在Java11上使用SpringCloudStream3.0,SpringBoot2.2.1.release。
代码段

public Function<KStream<String, String>, KStream<String, String>> filterstream() {
            return input -> input.map((key, value) -> new KeyValue<>(key, value))
                    .peek((key, value) -> System.out.println("......Key :" + key + "....Value: " + value))
                    .filter((key, value) -> value.equalsIgnoreCase("start"))
                    .peek((key, value) -> System.out.println(".after filter.....Key :" + key + "....Value: " + value));
       }

应用程序.yml

filterstream-in-0:
          destination: myinput
        filterstream-out-0:
          destination: filteredoutput

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题