在我的Sping Boot 应用程序中,我有Kafka消费者类,每当主题中有消息可用时,它就会频繁地读取消息。我想限制消费者每隔2小时消费一次消息。就像在阅读一条消息后,消费者会暂停2小时,然后再消费另一条消息。这是我的消费者配置方法:-
@Bean
public Map<String, Object> scnConsumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// common props
logger.info("KM Dataloader :: Kafka Brokers for Software topic: {}", bootstrapServersscn);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersscn);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 7200000);
// ssl props
propsMap.put("security.protocol", mpaasSecurityProtocol);
propsMap.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
propsMap.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword);
propsMap.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystorePath);
propsMap.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword);
return propsMap;
}
然后我创建了这个容器方法,在这里我设置了Kafka配置的其余部分
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
LOGGER.info("Setting concurrency to {} for {}", config.getConcurrency(), topicName);
factory.setConcurrency(config.getConcurrency());
factory.setConsumerFactory(cFactory);
factory.setRetryTemplate(retryTemplate);
factory.getContainerProperties().setIdleBetweenPolls(7200000);
return factory;
使用此代码分区每2小时重新平衡一次,但它根本不阅读消息。我的Kafka消费者方法:-
@Bean
public KmKafkaListener softwareKafkaListener(KmSoftwareService softwareService) {
return new KmKafkaListener(softwareService) {
@KafkaListener(topics = SOFTWARE_TOPIC, containerFactory = "softwareMessageContainer", groupId = SOFTWARE_CONSUMER_GROUP)
public void onscnMessageforSA20(@Payload ConsumerRecord<String, Object> record)
throws InterruptedException {
this.onMessage(record);
}
};
}
1条答案
按热度按时间nwlqm0z11#
尝试在
KmKafkaListener
中添加带注解的@KafkaListener
方法,以便Spring kafka负责调用它。并以这种方式初始化Bean