apachekafka—对组中的使用者调用seek是否设置了整个组的偏移量

sqserrrh  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(455)

我在同一个组中有3个使用者,我调用seek()来设置一个使用者上一个分区的偏移量。这将设置组中所有使用者的偏移量,还是仅设置特定分区上的偏移量。
我希望能够重置组以重新开始日志中的所有记录。但我只会给一个消费者打电话。

nfzehxib

nfzehxib1#

在同一组中有3个使用者意味着一个分区只分配给这些使用者中的一个,因此当您调用seek时,它只对只有一个使用者的分区有效,因此其他使用者不受影响。

bmp9r5qi

bmp9r5qi2#

seek()将仅回放到该分区中指定的偏移量。然而,对于旧的高级消费api,没有办法做到这一点。但在调用方法时

public void seek(TopicPartition partition,
    long offset);

它将倒带到所提供主题分区的分区和主题中的偏移位置。
如果要重置组消耗,则必须使用每个分区的偏移量对topicpartition进行seek调用。
或者您可以使用offsetsfortimes并调用组分区,如下所示:

Map<TopicPartition, Long> query = new HashMap<>();
query.put(new TopicPartition("topic-name", 0),Instant.now().minus(10, MINUTES).toEpochMilli());
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
result.entrySet().stream().forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));

如果您使用的是较旧的高级使用者,则当前没有api来重置使用者中的偏移量。唯一的方法是停止所有使用者并手动重置zk中该使用者组的偏移量。
但是,在0.11.0版本中,添加了一个工具来重置具有不同范围(如主题、分区等)的组的偏移量。您可以在此处找到详细信息:https://cwiki.apache.org/confluence/display/kafka/kip-122%3a+add+reset+consumer+group+offsets+tooling . 以及https://issues.apache.org/jira/browse/kafka-4743

相关问题