基于apachekafka绑定器的spring云流函数模型

bxjv4tth  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(390)

这是这个问题的续篇。我可以将“普通”apachekafka活页夹与函数模型一起使用吗?到目前为止,使用基于注解的配置,我混合了两者, spring-cloud-stream-binder-kafka 用于简单的消费/生产和 spring-cloud-stream-binder-kafka-streams 用于一个应用程序中的高级流处理。
功能模型似乎只有 streams 如果我尝试混合使用这两种方法——基于注解的简单用法和用于流的功能性方法,流绑定就不会注册。

  1. spring.cloud:
  2. stream:
  3. function:
  4. definition: processStream
  5. bindings:
  6. processStream-in-0:
  7. destination: my-topic
  8. simple-binding-in:
  9. destination: another-topic
  10. public interface SimpleBinding {
  11. String INPUT = "simple-binding-in";
  12. @Input(INPUT)
  13. SubscribableChannel simpleIn();
  14. }
  15. @Component
  16. public class SimpleListener {
  17. @StreamListener(SimpleBinding.INPUT)
  18. public void listen(@Payload SomeDto payload) {
  19. }
  20. }
  21. @Configuration
  22. public class FunctionalStream {
  23. @Bean
  24. public Consumer<KStream<String>> processStream() {
  25. return eventStream -> eventStream.map()
  26. }
  27. }
  28. ``` `@EnableBinding(SimpleBinding.class)` 配置类中存在。是否建议/支持按说明混合使用这两种方法,还是我应该使用 `streams-binder` 即使是简单的信息消费?
qyuhtwio

qyuhtwio1#

对于kafka活页夹,您可以而且绝对应该使用函数模型,同时忘记streamlistener。这样,它将与您的kstream功能模型保持一致。

  1. spring.cloud:
  2. stream:
  3. function:
  4. definition: processStream
  5. bindings:
  6. processStream-in-0:
  7. destination: my-topic
  8. listen-in-0:
  9. destination: another-topic
  10. @Component
  11. public class SimpleListener {
  12. @Bean
  13. public Consumer<SomeDto> listen() {
  14. return payload -> ...
  15. }
  16. }
  17. @Configuration
  18. public class FunctionalStream {
  19. @Bean
  20. public Consumer<KStream<String>> processStream() {
  21. return eventStream -> eventStream.map()
  22. }
  23. }
展开查看全部

相关问题