如何将Kafka消费群体的消费补偿推进到底?

k7fdbhmy  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(262)

有没有一种简单的方法——使用kafkarestapi——将使用者组中所有分区上的使用者偏移量提前到分区的末尾?实际上,有时我希望跳过消耗所有剩余的消息——例如,如果我希望重新生成所有消息。
我知道我可以检索消费者组、检索分区、循环并查找每个分区——有没有更简单的方法?

2vuwiymt

2vuwiymt1#

根据post-consumers示例的文档,您可以在 ConsumerGroup 请求如下:

POST /consumers/testgroup/instances/my_consumer/positions/end HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

不过,您仍然需要提前知道订阅的主题和consumergroup的分区。

编辑:

我想有时跳过消费所有剩余的消息
我在这里看到了多个选项,但在我看来,所有这些选项都相当粗糙,而且也没有使用kafkarestapi。

方案1

更改保留时间( retention.ms )主题的一个小值(如 1 ),请稍等,让logcleaner删除所有消息并将保留时间更改回正常。然后生成新的替代数据。

方案2

将所有使用者的consumergroup名称更改为新的consumergroup(配置) group.id )并让消费者通过设置 auto.offset.reset=latest . 然后生成新的替代数据。

方案3

类似于我最初的回答,使用Kafka工具 kafka-consumer-groups 要手动将消费者组(例如“myconsumer”)的偏移量更改为结束偏移量,请执行以下操作:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group myConsumer --topic myTopic --to-latest

相关问题