apache-kafka 如何在一段时间间隔后阅读Kafka消费者的信息

zpjtge22  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(139)

在我的Sping Boot 应用程序中,我有Kafka消费者类,每当主题中有消息可用时,它就会频繁地读取消息。我想限制消费者每隔2小时消费一次消息。就像在阅读一条消息后,消费者会暂停2小时,然后再消费另一条消息。这是我的消费者配置方法:-

@Bean
public Map<String, Object> scnConsumerConfigs() {
    Map<String, Object> propsMap = new HashMap<>();

    // common props
    logger.info("KM Dataloader :: Kafka Brokers for Software topic: {}", bootstrapServersscn);
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersscn);
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 7200000);

    // ssl props
    propsMap.put("security.protocol", mpaasSecurityProtocol);
    propsMap.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    propsMap.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword);
    propsMap.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystorePath);
    propsMap.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword);
    return propsMap;
}

然后我创建了这个容器方法,在这里我设置了Kafka配置的其余部分

ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    LOGGER.info("Setting concurrency to {} for {}", config.getConcurrency(), topicName);
    factory.setConcurrency(config.getConcurrency());
    factory.setConsumerFactory(cFactory);
    factory.setRetryTemplate(retryTemplate);
    factory.getContainerProperties().setIdleBetweenPolls(7200000);
    return factory;

使用此代码分区每2小时重新平衡一次,但它根本不阅读消息。我的Kafka消费者方法:-

@Bean
public KmKafkaListener softwareKafkaListener(KmSoftwareService softwareService) {
    return new KmKafkaListener(softwareService) {
        @KafkaListener(topics = SOFTWARE_TOPIC, containerFactory = "softwareMessageContainer", groupId = SOFTWARE_CONSUMER_GROUP)
        public void onscnMessageforSA20(@Payload ConsumerRecord<String, Object> record)
                throws InterruptedException {
            this.onMessage(record);
        }
    };
}
nwlqm0z1

nwlqm0z11#

尝试在KmKafkaListener中添加带注解的@KafkaListener方法,以便Spring kafka负责调用它。

public class KmKafkaListener{

  @KafkaListener(topics = SOFTWARE_TOPIC, containerFactory = "softwareMessageContainer", groupId = SOFTWARE_CONSUMER_GROUP)
  public void onscnMessageforSA20(@Payload ConsumerRecord<String, Object> record)
                throws InterruptedException {
            this.onMessage(record);
        }
}

并以这种方式初始化Bean

@Bean
public KmKafkaListener softwareKafkaListener(KmSoftwareService softwareService) {
    return new KmKafkaListener(softwareService);
}

相关问题