Kafka暂停分区,花了很长时间(10分钟)再次恢复分区

col17t5w  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(235)

我使用的是具有以下配置的spring kafka:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
environment.getProperty("kafka.url"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
KafkaAvroDeserializer.class);
props.put("specific.avro.reader", true);
props.put("schema.registry.url", 
environment.getProperty("schema.registry.url"));
props.put("session.timeout.ms", 120000);
props.put("request.timeout.ms", 300000);
props.put("max.partition.fetch.bytes", "10240");
props.put("max.poll.interval.ms", 120000);
props.put("max.poll.records", 100);

以及

@Bean
public ConcurrentKafkaListenerContainerFactory<String, 
List<AlertNotificationPayload>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, 
List<AlertNotificationPayload>> factory = new 
ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(10);
}

一旦我启动服务器,使用者就开始使用消息,但是过了一段时间,很少有分区会暂停,并且几乎9-10分钟都不会恢复。使用者线程中的处理时间以毫秒为单位,并且有单独的线程进行处理。我尝试了多种配置组合,但都不起作用。
我还尝试使用设置工厂.getcontainerproperties().setpauseenabled(false);这会停止暂停/恢复,但在一段时间后,重新平衡开始发生,消费者迟早会死亡,一段时间后,消费者群体就没有消费者了。
我使用以下版本:

<dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
         <version>1.1.3.RELEASE</version> //tried latest one as well
</dependency>
<dependency>
        <groupId>io.confluent.maven</groupId>
        <artifactId>kafka-connect-quickstart</artifactId>
        <version>0.10.0.0</version>
</dependency>
<dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>3.2.0</version>
</dependency>

请帮忙。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题