重读Kafka的一个主题

vsikbqxv  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(342)

我想以编程的方式重读Kafka的所有事件。我知道有一个应用程序重置工具,但据我所知,这需要我关闭我的应用程序。我无法在我的用例中关闭应用程序。
有没有办法让我的应用程序重读Kafka主题的所有事件?示例或代码片段将不胜感激。最好但不一定使用Kafka流。

1qczuiv0

1qczuiv01#

您不能用kafka流重新阅读主题,但使用“普通”kafka可以将使用者定位到任何有效偏移量。
像这样的

  1. final Map<Integer, TopicPartition> partitions = new HashMap<>();
  2. // get all partitions for the topic
  3. for (final PartitionInfo partition : consumer.partitionsFor("your_topic")) {
  4. final TopicPartition tp = new TopicPartition("your_topic", partition.partition());
  5. partitions.put(partition.partition(), tp);
  6. }
  7. consumer.assign(partitions.values());
  8. consumer.seekToBeginning(partitions.values());
jei2mxaa

jei2mxaa2#

消费者必须停止,以避免在提交补偿和修改补偿的消费者之间遇到竞争条件。
如果希望保留使用者组id,可以使用kafka consumer seek API查找最早的偏移量。然后可以使用adminclient来更改使用者组偏移量。 kafka-consumer-groups --reset-offsets 实施应该是一个很好的例子,说明如何做到这一点:https://github.com/apache/kafka/blob/85b6545b8159885c57ab67e08b7185be8a607988/core/src/main/scala/kafka/admin/consumergroupcommand.scala#l446-l469号
否则,如果您的 auto.offset.reset 设置为最早。

相关问题