为什么Kafka的seektobeing和seektoend不能与assign一起工作?

6yoyoihd  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(502)

比方说,我想检查kafka中第一条和最后一条消息的偏移量,以确定一个特定的分区。我的想法是使用 assign(…) 方法以及 seekToBeginning(…) 以及 seekToEnd(…) . 不幸的是,这不起作用。
如果我设置 AUTO_OFFSET_RESET_CONFIG"latest" ,的 seekToBeginning(…) 没有效果;如果我把它设为 "earliest" , seekToEnd(…) 不起作用。似乎对我的消费者来说唯一重要的是 AUTO_OFFSET_RESET_CONFIG .
我见过一个类似的主题,但问题涉及到 subscribe() ,而不是 assign() 方法。建议的解决方案是实施 ConsumerRebalanceListner 并将其作为参数传递给 subscribe() 方法。不幸的是 assign() 方法只有一个签名,并且只能获取主题分区列表。
问题是:是否可以使用 seekToBeginning() 或者 seekToEnd()assign() 方法。如果是,怎么做?如果没有,为什么?
我的代码的相关片段:

KafkaConsumer<String, ProtoMeasurement> consumer = createConsumer();
TopicPartition zeroP = new TopicPartition(TOPIC, 1);
List<TopicPartition> partitions = Collections.singletonList(zeroP);

consumer.assign(partitions);
consumer.poll(Duration.ofSeconds(1));
consumer.seekToBeginning(partitions);
long currOffsetPos = consumer.position(zeroP);
LOGGER.info("Current offset {}.", currOffsetPos);
ConsumerRecords<String, ProtoMeasurement> records = consumer.poll(Duration.ofMillis(100));
// ...

记录器打印偏移量n,这是所考虑主题的最大(最新)偏移量。

yqhsw0fo

yqhsw0fo1#

比方说,我想检查kafka中第一条和最后一条消息的偏移量,以确定某个特定分区
你可以用 beginningOffsets 以及 endOffsets 为了这个。
问题是:是否可以使用 seekToBeginning() 或者 seekToEnd() 使用assign()
你得打电话 poll() 过了一段时间 seekToBeginning 或者 seekToEnd :
此函数的计算是延迟的,仅在调用poll(duration)或position(topicpartition)时才查找所有分区中的第一个偏移量

wmtdaxz3

wmtdaxz32#

我注意到这种行为在mockconsumer中是错误和不一致的。这些文档说它们是懒惰的,但是在position()调用后会触发。但对于消费者来说,情况并非如此。特别是,我发现mockconsumer在大约1.0到2.2.2之间工作,在2.3.0之后就坏了
取而代之的是,我选择做以下几点,这在模拟消费者和真实消费者中都是一致的:

// consistently working seed to beginning
consumer.beginningOffsets(partitions).forEach(consumer::seek);
// consistently working seed to end
consumer.endOffsets(partitions).forEach(consumer::seek);

如果有线程同时调用poll,这就有点危险了,但在我的例子中效果很好,我只想在应用程序开始轮询时手动控制偏移位置。

相关问题