SpringKafka使用消费者已经使用的旧消息

chhqkbe1  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(425)

我有一个springboot应用程序,使用springkafka。我们已经创建了一个消费者,它正在消费来自4个主题的消息。这些主题没有任何分区。我在这里面临的问题是一个rendom行为,在三个主题中,在任何一个主题偏移量停止时,我的使用者不断地从该主题中消费相同的消息,直到我们需要手动将偏移量移动到最新。下面是我的配置yaml配置:

spring:
  kafka:
   consumer:
      bootstrap-servers:  ${KAFKA_BOOTSTRAP_SERVERS}
      group-id: group_id
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
  consumer:
    allTopicList: user.topic,student.topic,class.topic,teachers.topic**

因为它是一个spring引导应用程序,所以默认偏移量设置为latest。我做错了什么,请帮助我理解。

6xfqseft

6xfqseft1#

你用的是什么版本?
你应该设置

...consumer:
     enable-auto-commit: false

侦听器容器将更可靠地提交偏移量。
你还应该考虑

ack-mode: RECORD

容器将提交每个成功处理的记录的偏移量(默认值为batch)。

相关问题