我想以编程的方式重读Kafka的所有事件。我知道有一个应用程序重置工具,但据我所知,这需要我关闭我的应用程序。我无法在我的用例中关闭应用程序。有没有办法让我的应用程序重读Kafka主题的所有事件?示例或代码片段将不胜感激。最好但不一定使用Kafka流。
1qczuiv01#
您不能用kafka流重新阅读主题,但使用“普通”kafka可以将使用者定位到任何有效偏移量。像这样的
final Map<Integer, TopicPartition> partitions = new HashMap<>(); // get all partitions for the topic for (final PartitionInfo partition : consumer.partitionsFor("your_topic")) { final TopicPartition tp = new TopicPartition("your_topic", partition.partition()); partitions.put(partition.partition(), tp); } consumer.assign(partitions.values()); consumer.seekToBeginning(partitions.values());
final Map<Integer, TopicPartition> partitions = new HashMap<>();
// get all partitions for the topic
for (final PartitionInfo partition : consumer.partitionsFor("your_topic")) {
final TopicPartition tp = new TopicPartition("your_topic", partition.partition());
partitions.put(partition.partition(), tp);
}
consumer.assign(partitions.values());
consumer.seekToBeginning(partitions.values());
jei2mxaa2#
消费者必须停止,以避免在提交补偿和修改补偿的消费者之间遇到竞争条件。如果希望保留使用者组id,可以使用kafka consumer seek API查找最早的偏移量。然后可以使用adminclient来更改使用者组偏移量。 kafka-consumer-groups --reset-offsets 实施应该是一个很好的例子,说明如何做到这一点:https://github.com/apache/kafka/blob/85b6545b8159885c57ab67e08b7185be8a607988/core/src/main/scala/kafka/admin/consumergroupcommand.scala#l446-l469号否则,如果您的 auto.offset.reset 设置为最早。
kafka-consumer-groups --reset-offsets
auto.offset.reset
2条答案
按热度按时间1qczuiv01#
您不能用kafka流重新阅读主题,但使用“普通”kafka可以将使用者定位到任何有效偏移量。
像这样的
jei2mxaa2#
消费者必须停止,以避免在提交补偿和修改补偿的消费者之间遇到竞争条件。
如果希望保留使用者组id,可以使用kafka consumer seek API查找最早的偏移量。然后可以使用adminclient来更改使用者组偏移量。
kafka-consumer-groups --reset-offsets
实施应该是一个很好的例子,说明如何做到这一点:https://github.com/apache/kafka/blob/85b6545b8159885c57ab67e08b7185be8a607988/core/src/main/scala/kafka/admin/consumergroupcommand.scala#l446-l469号否则,如果您的
auto.offset.reset
设置为最早。