我正在使用springkafka开发一个消费者应用程序。我计划用pod保持它24*7的运行。但是,最近我知道有一些批处理过程会在这两者之间运行。而且,当这些批正在运行时,我们的处理不应该发生。所以,也许,不知何故,我不得不停止轮询记录,当批处理完成后,我就可以继续处理了。但是,我不知道如何做到这一点。。
无论批处理是否正在运行,我都可以通过查看一些标志查询并从表中获取详细信息。但是,我怎样才能停止对记录的投票呢?如果我只是让消费者应用程序运行而不进行任何处理,这不会导致重新平衡吗?
配置类:
@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, MAX_POLL_INTERVAL);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KAFKA_CONCURRENCY);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // manual async comm
return factory;
}
代码:
@KafkaListener(topics = "${app.topic}", groupId = "${app.group_id_config}")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgement) throws Exception {
try {
int flag = getBatchIndquery ();
// How to stop and resume based on the flag value---?
// business logic process once the consumer resumes
processRecords();
InsertDb();
acknowledgement.acknowledge();
}catch (Exception ex) {
System.out.println(record);
System.out.println(ex.getMessage());
}
}
1条答案
按热度按时间bgibtngc1#
使用端点注册表停止和启动容器。。。
请参阅@kafkalistener lifecycle management。
为@kafkalistener注解创建的侦听器容器不是应用程序上下文中的bean。相反,它们是用kafkalistenerendpointregistry类型的基础结构bean注册的。这个bean由框架自动声明并管理容器的生命周期;它将自动启动任何将autostartup设置为true的容器。所有容器工厂创建的所有容器必须处于同一阶段。有关详细信息,请参阅侦听器容器自动启动。您可以使用注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有已注册的容器。或者,可以通过使用单个容器的id属性来获取对该容器的引用。您可以在注解上设置autostartup,这将覆盖在容器工厂中配置的默认设置。您可以从应用程序上下文(如auto-wiring)获取对bean的引用,以管理其注册的容器。