即使ackmode处于手动模式,当使用者应用程序在确认之前停止时,也会立即提交spring kafka偏移量

wxclj1h5  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(271)

我使用SpringKafka运行kafka消费者服务。我已将enable.auto.commit设置为false,将ackmode设置为manual\u immediate。同时使用ConcurrentKafkalListenerContainerFactory,并发性等于10。(我的主题中的分区数)
现在我在听我的主题,它很少有偏移,它滞后。我正在调试模式下运行我的应用程序,当我在kafkalistener方法中得到第一条消息时,我会强制停止我的应用程序。
应用程序在调用之前停止 acknowledgment.acknowledge() 但当我检查任何滞后时,它显示为零。
侦听器代码

@KafkaListener(topics = "baeldung")
  public void listen(ConsumerRecord<?, ?> message, Acknowledgment acknowledgment) {
    log.info("logging");

    System.out.println("Received Messasge in: "+ " " + message.value().toString());
      acknowledgment.acknowledge();
    System.out.println("done");
  }

调试指针设置为 System.out.println("Received Messasge in: "+ " " + message.value().toString()); 消费者属性:

@Bean
  public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
        bootstrapAddress);
    props.put(
        ConsumerConfig.GROUP_ID_CONFIG,
        groupId);
    props.put(
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
    props.put(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
    props.put(
        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
       false);
    return new DefaultKafkaConsumerFactory<>(props);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String>
  kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(10);
    factory.setConsumerFactory(consumerFactory());
    factory
        .getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return factory;
  }

主题在运行消费代码之前描述:

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testMayank2

Consumer group 'testMayank2' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testMayank2     baeldung        5          135             146             11              -               -               -
testMayank2     baeldung        9          140             145             5               -               -               -
testMayank2     baeldung        6          140             145             5               -               -               -
testMayank2     baeldung        1          144             148             4               -               -               -
testMayank2     baeldung        2          141             145             4               -               -               -
testMayank2     baeldung        0          144             148             4               -               -               -
testMayank2     baeldung        7          142             147             5               -               -               -
testMayank2     baeldung        3          142             147             5               -               -               -
testMayank2     baeldung        8          141             146             5               -               -               -
testMayank2     baeldung        4          142             146             4               -               -               -

主题运行消费代码后描述:

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testMayank2

Warning: Consumer group 'testMayank2' is rebalancing.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testMayank2     baeldung        5          146             146             0               -               -               -
testMayank2     baeldung        9          140             145             5               -               -               -
testMayank2     baeldung        6          145             145             0               -               -               -
testMayank2     baeldung        1          144             148             4               -               -               -
testMayank2     baeldung        2          141             145             4               -               -               -
testMayank2     baeldung        0          148             148             0               -               -               -
testMayank2     baeldung        7          147             147             0               -               -               -
testMayank2     baeldung        3          147             147             0               -               -               -
testMayank2     baeldung        8          146             146             0               -               -               -
testMayank2     baeldung        4          146             146             0               -               -               -

spring boot版本:2.2.6.release kafka版本:2.3.1
我做错什么了吗?在异常关机期间,是否期望提交偏移量?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题