我正在尝试设置一个spring boot应用程序,听kafka的。
我用的是Kafka流活页夹。
一个简单的 @EnableBinding
```
@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input) {
logger.info("Stream listening");
return input
.peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
}
interface StreamProcessor {
String INPUT = "input_1";
String OUTPUT = "output_1";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
}
而且在 `application.yml` ```
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:29092
bindings:
input_1:
destination: mytopic1
group: readgroup
output_1:
destination: mytopic2
input_2:
destination: mytopic3
group: readgroup
output_2:
destination: mytopic4
application:
name: stream_s1000_app
一切正常。
但如果我尝试添加另一个具有其他绑定的类,则会出现以下错误:
以下订阅的主题未分配给任何成员:[mytopic1]
第二个绑定示例:
@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo {
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input) {
logger.info("Stream listening binding two");
return input
.peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
}
interface StreamProcessor {
String INPUT = "input_2";
String OUTPUT = "output_2";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
}
我错过了什么?我不能在同一个应用程序中使用多个输入主题和多个输出吗?有与application.name相关的内容吗?
2条答案
按热度按时间y4ekin9u1#
尝试
a1o7rhls2#
我刚试了一个应用程序,结果成功了。当在同一个应用程序中有多个处理器时,需要确保每个处理器都有自己的应用程序id
application.yml
.我看到两个处理器都登录到控制台上了。此外,还看到了输出主题上的消息。
application.yml的相关部分