我在同一个组中有3个使用者,我调用seek()来设置一个使用者上一个分区的偏移量。这将设置组中所有使用者的偏移量,还是仅设置特定分区上的偏移量。我希望能够重置组以重新开始日志中的所有记录。但我只会给一个消费者打电话。
nfzehxib1#
在同一组中有3个使用者意味着一个分区只分配给这些使用者中的一个,因此当您调用seek时,它只对只有一个使用者的分区有效,因此其他使用者不受影响。
bmp9r5qi2#
seek()将仅回放到该分区中指定的偏移量。然而,对于旧的高级消费api,没有办法做到这一点。但在调用方法时
public void seek(TopicPartition partition, long offset);
它将倒带到所提供主题分区的分区和主题中的偏移位置。如果要重置组消耗,则必须使用每个分区的偏移量对topicpartition进行seek调用。或者您可以使用offsetsfortimes并调用组分区,如下所示:
Map<TopicPartition, Long> query = new HashMap<>(); query.put(new TopicPartition("topic-name", 0),Instant.now().minus(10, MINUTES).toEpochMilli()); Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query); result.entrySet().stream().forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));
如果您使用的是较旧的高级使用者,则当前没有api来重置使用者中的偏移量。唯一的方法是停止所有使用者并手动重置zk中该使用者组的偏移量。但是,在0.11.0版本中,添加了一个工具来重置具有不同范围(如主题、分区等)的组的偏移量。您可以在此处找到详细信息:https://cwiki.apache.org/confluence/display/kafka/kip-122%3a+add+reset+consumer+group+offsets+tooling . 以及https://issues.apache.org/jira/browse/kafka-4743
2条答案
按热度按时间nfzehxib1#
在同一组中有3个使用者意味着一个分区只分配给这些使用者中的一个,因此当您调用seek时,它只对只有一个使用者的分区有效,因此其他使用者不受影响。
bmp9r5qi2#
seek()将仅回放到该分区中指定的偏移量。然而,对于旧的高级消费api,没有办法做到这一点。但在调用方法时
它将倒带到所提供主题分区的分区和主题中的偏移位置。
如果要重置组消耗,则必须使用每个分区的偏移量对topicpartition进行seek调用。
或者您可以使用offsetsfortimes并调用组分区,如下所示:
如果您使用的是较旧的高级使用者,则当前没有api来重置使用者中的偏移量。唯一的方法是停止所有使用者并手动重置zk中该使用者组的偏移量。
但是,在0.11.0版本中,添加了一个工具来重置具有不同范围(如主题、分区等)的组的偏移量。您可以在此处找到详细信息:https://cwiki.apache.org/confluence/display/kafka/kip-122%3a+add+reset+consumer+group+offsets+tooling . 以及https://issues.apache.org/jira/browse/kafka-4743