基于apachekafka绑定器的spring云流函数模型

bxjv4tth  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(362)

这是这个问题的续篇。我可以将“普通”apachekafka活页夹与函数模型一起使用吗?到目前为止,使用基于注解的配置,我混合了两者, spring-cloud-stream-binder-kafka 用于简单的消费/生产和 spring-cloud-stream-binder-kafka-streams 用于一个应用程序中的高级流处理。
功能模型似乎只有 streams 如果我尝试混合使用这两种方法——基于注解的简单用法和用于流的功能性方法,流绑定就不会注册。

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            simple-binding-in:
              destination: another-topic

public interface SimpleBinding {

    String INPUT = "simple-binding-in";

    @Input(INPUT)
    SubscribableChannel simpleIn();

}

@Component
public class SimpleListener {

    @StreamListener(SimpleBinding.INPUT)
    public void listen(@Payload SomeDto payload) {
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}
``` `@EnableBinding(SimpleBinding.class)` 配置类中存在。是否建议/支持按说明混合使用这两种方法,还是我应该使用 `streams-binder` 即使是简单的信息消费?
qyuhtwio

qyuhtwio1#

对于kafka活页夹,您可以而且绝对应该使用函数模型,同时忘记streamlistener。这样,它将与您的kstream功能模型保持一致。

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            listen-in-0:
              destination: another-topic

@Component
public class SimpleListener {

    @Bean
    public Consumer<SomeDto> listen() {
        return payload -> ...
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}

相关问题