我想有一个Kafka消费者,从一个主题的最新消息开始。
以下是java代码:
private static Properties properties = new Properties();
private static KafkaConsumer<String, String> consumer;
static
{
properties.setProperty("bootstrap.servers","localhost");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "test");
properties.setProperty("auto.offset.reset", "latest");
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("mytopic"));
}
@Override
public StreamHandler call() throws Exception
{
while (true)
{
ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic");
for(ConsumerRecord<String, String> rec : records)
{
System.out.println(rec.value());
}
}
}
虽然auto.offset.reset的值是最新的,但是使用者从属于2天前的消息开始,然后它会赶上最新的消息。
我错过了什么?
2条答案
按热度按时间pbpqsu0x1#
你以前用同样的方法运行过同样的代码吗
group.id
? 这个auto.offset.reset
参数仅在尚未为使用者存储现有偏移量时使用。因此,如果您以前运行过这个示例,比如说两天前,然后再次运行它,它将从上次使用的位置开始。使用
seekToEnd()
如果您想手动转到主题的结尾。看到了吗https://stackoverflow.com/a/32392174/1392894 更深入的讨论。
nfeuvbwi2#
如果要手动控制偏移的位置,则需要设置enable.auto.commit=false。
如果要将所有偏移定位到每个分区的末尾,请调用seektoend()
https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#seektoend(java.util.collection)集合