Kafka消费者不从最新消息开始

ndh0cuux  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(436)

我想有一个Kafka消费者,从一个主题的最新消息开始。
以下是java代码:

private static Properties properties = new Properties();
private static KafkaConsumer<String, String> consumer;
static
{
    properties.setProperty("bootstrap.servers","localhost");
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("group.id", "test");
    properties.setProperty("auto.offset.reset", "latest");
    consumer = new KafkaConsumer<>(properties);

    consumer.subscribe(Collections.singletonList("mytopic"));
}

@Override
public StreamHandler call() throws Exception
{
    while (true) 
    {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
        Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic");
        for(ConsumerRecord<String, String> rec : records)
        {
            System.out.println(rec.value());
        }
    }
}

虽然auto.offset.reset的值是最新的,但是使用者从属于2天前的消息开始,然后它会赶上最新的消息。
我错过了什么?

pbpqsu0x

pbpqsu0x1#

你以前用同样的方法运行过同样的代码吗 group.id ? 这个 auto.offset.reset 参数仅在尚未为使用者存储现有偏移量时使用。因此,如果您以前运行过这个示例,比如说两天前,然后再次运行它,它将从上次使用的位置开始。
使用 seekToEnd() 如果您想手动转到主题的结尾。
看到了吗https://stackoverflow.com/a/32392174/1392894 更深入的讨论。

nfeuvbwi

nfeuvbwi2#

如果要手动控制偏移的位置,则需要设置enable.auto.commit=false。
如果要将所有偏移定位到每个分区的末尾,请调用seektoend()
https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#seektoend(java.util.collection)集合

相关问题