我有三个消费者,都有多个示例,都在消费同一个主题。我希望每个消费者消费一次这个主题。为此,我创建了一个消费群体。据我所知,kafka应该足够聪明,可以选择一个服务示例来使用该主题,但这并没有发生,而且所有消费者的所有示例都在使用该主题
我认为这可能与三个消费者都具有相同的组名有关,因此我关闭了其中两个消费者,让一个消费者有两个示例,但我仍然看到两条记录进入数据库,而消费者组只应选择一个示例插入数据库。
我是做错了什么还是遗漏了什么?
应用程序.yml
spring:
cloud:
stream:
bindings:
input-data:
destination: publisheddata.t
group: publisheddata
kafka:
bindings:
input:
consumer:
autoCommitOffset: false
binder:
auto-create-topics: true
kafka:
mode: raw
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka:9092
zk-nodes: kafka:2181
频道.java
public interface Channels {
String INPUT_DATA = "input-data";
@Input(INPUT_DATA)
SubscribableChannel dataInput();
}
数据处理程序.java
@EnableBinding(Channels.class)
@Configuration
public class DataMessageHandler {
@StreamListener(Channels.INPUT_DATA)
public void handle(Message<?> message) {
... handling message ...
}
1条答案
按热度按时间eulz3vhy1#
万一以后有人碰到这个问题,在研究了几天之后,我发现问题出在我使用的springcloud版本上
Brixton.RELEASE
,一旦我把这个更新到Dalston.SR2
这解决了我的问题