我有两个spring引导项目,第一个是一个生产者,它用一个分区向一个主题发送消息。
第二个是使用者应用程序,它读取主题和一个分区。对于消费者,我使用kafkamessagedrivenchanneladapter、kafkamessagelistenercontainer并在consumerfactory中指定消费者组id。
注意,我使用的是spring集成kafka 2.0.0.release和spring kafka 1.0.2.release,后者使用kakfa0.9。我正在docker容器中运行3个docker示例或kafka 0.10.0和一个zookeeper示例。
当我运行我的消费者的一个示例时,它工作得很好,读取消息,处理消息。
但是,当我运行应用程序的第二个示例(我只是更改端口)时,生产者应用程序生成的任何消息都会被两个示例接收,从而导致每个消息处理两次。
根据文档,我觉得这个场景应该是可行的,因为在这个例子中第二个示例的原因是为了弹性,如果一个应用示例宕机,另一个会接管,但不是两个都应该得到消费者组中相同主题/分区的消息。注意,我使用服务激活器(facade)来处理消息。
有什么我不知道的吗。
请帮忙。
以下是我基于spring integration kafka示例的消费者应用程序配置:
{
@ServiceActivator(inputChannel = "received", outputChannel = "nullChannel", adviceChain = {"requestHandlerRetryAdvice"})
@Bean
public MessageConsumerServiceFacade messageConsumerServiceFacade() {
return new DefaultMessageConsumerServiceFacade();
}
@ServiceActivator(inputChannel = "errorChannel", outputChannel = "nullChannel")
@Bean
public MessageConsumerServiceFacade messageConsumerErrorServiceFacade() {
return new DefaultMessageConsumerErrorServiceFacade();
}
@ServiceActivator(inputChannel = "received", outputChannel = "nullChannel", adviceChain = {"requestHandlerRetryAdvice"})
@Bean
public MessageConsumerServiceFacade messageConsumerServiceFacade() {
return new DefaultMessageConsumerServiceFacade();
}
@ServiceActivator(inputChannel = "errorChannel", outputChannel = "nullChannel")
@Bean
public MessageConsumerServiceFacade messageConsumerErrorServiceFacade() {
return new DefaultMessageConsumerErrorServiceFacade();
}
@Bean
public IntegrationFlow consumer() throws Exception {
LOGGER.info("starting consumer..");
return IntegrationFlows
.from(adapter(container()))
.get();
}
@Bean
public KafkaMessageListenerContainer<String, byte[]> container() throws Exception {
// This variant of the constructors DOES NOT WORK with Consumer Group, with this setting, all consumers receives the message - BAD for a cluster of consumer apps - duplicate message
//ContainerProperties containerProperties = new ContainerProperties( new TopicPartitionInitialOffset(this.topic, 0));
// Use THIS variant of the constructors to use Consumer Group successfully
// with auto re-balance of partitions to distribute loads among consumers, perfect for a cluster of consumer app
ContainerProperties containerProperties = new ContainerProperties(this.topic);
containerProperties.setAckOnError(false);
containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
KafkaMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);
return kafkaMessageListenerContainer;
}
@Bean
public ConsumerFactory<String, byte[]> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 2);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest, latest, none
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, byte[]> adapter(KafkaMessageListenerContainer<String, byte[]> container) {
KafkaMessageDrivenChannelAdapter<String, byte[]> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
kafkaMessageDrivenChannelAdapter.setErrorChannel(errorChannel());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public MessageChannel received() {
return new PublishSubscribeChannel();
}
@Bean
public MessageChannel errorChannel() {
return new PublishSubscribeChannel();
}
}
暂无答案!
目前还没有任何答案,快来回答吧!