多@enablebinding与kafka spring cloud stream

8yoxcaq7  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(426)

我正在尝试设置一个spring boot应用程序,听kafka的。
我用的是Kafka流活页夹。
一个简单的 @EnableBinding ```
@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {

@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input) {

    logger.info("Stream listening");

    return input
            .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
}

interface StreamProcessor {

    String INPUT = "input_1";
    String OUTPUT = "output_1";

    @Input(INPUT)
    KStream<String, String> input();

    @Output(OUTPUT)
    KStream<String, String> outputProcessed();
}

}

而且在 `application.yml` ```
spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:29092
      bindings:
         input_1:
           destination: mytopic1
           group: readgroup
         output_1:
           destination: mytopic2
         input_2:
           destination: mytopic3
           group: readgroup
         output_2:
           destination: mytopic4
  application:
    name: stream_s1000_app

一切正常。
但如果我尝试添加另一个具有其他绑定的类,则会出现以下错误:
以下订阅的主题未分配给任何成员:[mytopic1]
第二个绑定示例:

@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo {

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {

        logger.info("Stream listening binding two");

        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    }

    interface StreamProcessor {

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }
}

我错过了什么?我不能在同一个应用程序中使用多个输入主题和多个输出吗?有与application.name相关的内容吗?

y4ekin9u

y4ekin9u1#

尝试

@EnableBinding( { StreamExample.StreamProcessor.class, StreamExampleBindingTwo.StreamProcessor.class })
a1o7rhls

a1o7rhls2#

我刚试了一个应用程序,结果成功了。当在同一个应用程序中有多个处理器时,需要确保每个处理器都有自己的应用程序id application.yml .
我看到两个处理器都登录到控制台上了。此外,还看到了输出主题上的消息。

@SpringBootApplication
@EnableBinding({So54522918Application.StreamProcessor1.class, So54522918Application.StreamProcessor2.class})
public class So54522918Application {

    public static void main(String[] args) {
        SpringApplication.run(So54522918Application.class, args);
    }

    @StreamListener(StreamProcessor1.INPUT)
    @SendTo(StreamProcessor1.OUTPUT)
    public KStream<String, String> process1(KStream<String, String> input) {

        System.out.println("Stream listening");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    }

    @StreamListener(StreamProcessor2.INPUT)
    @SendTo(StreamProcessor2.OUTPUT)
    public KStream<String, String> process2(KStream<String, String> input) {

        System.out.println("Stream listening binding two");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    }

    interface StreamProcessor1 {

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }

    interface StreamProcessor2 {

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }

}

application.yml的相关部分

spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
  binder.configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  bindings.input_1.consumer.application-id: process-1
  bindings.input_2.consumer.application-id: process-2
spring.cloud.stream.bindings.input_1:
  destination: mytopic1
spring.cloud.stream.bindings.output_1:
  destination: mytopic2
spring.cloud.stream.bindings.input_2:
  destination: mytopic3
spring.cloud.stream.bindings.output_2:
  destination: mytopic4

相关问题