我正试着读Kafka最后一段的主题,但我不能正确地读:
在属性文件中,我设置了:
group.id=我的\u组
client.id=我的客户
enable.auto.commit=真
auto.offset.reset=最早
isolation.level=已提交读取
然后是代码:
consumer = new KafkaConsumer<>(props); // read properties
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
consumerRecords.forEach(consumerRecord -> System.out.println(consumerRecord.offset());
即使主题中有一些项目,我也可以看到偏移量是如何从0打印的。
在日志文件中,我可以看到以下内容(简称):
[consumer clientid=my_client,groupid=my_group]在第1代完成了组的分配:{my_client-f1678be7-ce6b-48e8-acf2-741ab28f7266=assignment(partitions=[mytopic-0])}
[consumer clientid=my\u client,groupid=my\u group]已成功加入具有第1代[consumer clientid=my\u client,groupid=my\u group]的组,并将新的分配通知分配者(partitions=[mytopic-0])
[consumer clientid=my\u client,groupid=my\u group]添加新分配的分区:mytopic-0
[consumer clientid=my\u client,groupid=my\u group]未找到分区mytopic-0的提交偏移量
[consumer clientid=my\u client,>groupid=my\u group]正在将分区mytopic-0的偏移量重置为偏移量0。
知道我做错什么了吗?我用seektobeginning()+poll()+commitsync()+seektoend()尝试了一些魔术,但不知怎么的,我认为这应该在默认情况下起作用。
1条答案
按热度按时间bz4sfanl1#
auto.offset.reset=最新版本将解决您的问题
https://docs.confluent.io/platform/current/clients/consumer.html#offset-管理