假设我有一个kafka主题,有4个分区,每个分区都有一些提交的消息。出于某种原因,我想基于给定的分区id和偏移量#重新播放一些已提交的消息,使用java客户机lib最好的方法是什么?
例如我有
托皮卡:
分区1:…偏移-1,偏移-3,偏移-7。。。
分区2:…偏移-2,偏移-4,偏移-8。。。
分区3:…偏移-5,偏移-6,偏移-9。。。
分区4:…偏移-10,偏移-11,偏移-12。。。
我只想重演
分区1:偏移-3
分区2:偏移-8
分区3:偏移-5
所以我有下面这样的代码
props.put("max.poll.records", "1"); // to make sure I only get exactly one desired message on that offset
({(1,3),(2,8),(3,5)}).stream(part_offset-> {
int i=1; // used as loop count down latch
while(i>=0){
consumer.assign(get_partition(part_offset.part));
consumer.seek(new TopicPartition("TopicA", part_offset.part), part_offset.offset);
records=consumer.pool(Duration.ofSeconds(1)); // I read somewhere kafka is lazy , so should I poll before this ?
for ( record : records) {
//do something
i--;
}
}
})
但上面的代码不起作用,它只是挂在那里什么也不做。只是想知道用给定的分区id和偏移量信息重放一些消息的最佳方法是什么?或者,我的消费方式不对吗?请告知
谢谢
1条答案
按热度按时间qeeaahzv1#
对不起,我的错。在轮询时,我没有为“无现有偏移量”设置阻止符。所以下面的代码在我设置了重试次数的上限之后运行得很好,可以随意发表评论了,但是我想知道使用int来停止这样的重试是否是最好的做法?谢谢