我可以根据给定的分区id和偏移量列表使用kafka消息吗?

wrrgggsh  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(353)

假设我有一个kafka主题,有4个分区,每个分区都有一些提交的消息。出于某种原因,我想基于给定的分区id和偏移量#重新播放一些已提交的消息,使用java客户机lib最好的方法是什么?
例如我有
托皮卡:
分区1:…偏移-1,偏移-3,偏移-7。。。
分区2:…偏移-2,偏移-4,偏移-8。。。
分区3:…偏移-5,偏移-6,偏移-9。。。
分区4:…偏移-10,偏移-11,偏移-12。。。
我只想重演
分区1:偏移-3
分区2:偏移-8
分区3:偏移-5
所以我有下面这样的代码

props.put("max.poll.records", "1"); // to make sure I only get exactly one desired message on that offset 

({(1,3),(2,8),(3,5)}).stream(part_offset-> {
  int i=1; // used as loop count down latch
while(i>=0){
 consumer.assign(get_partition(part_offset.part));
 consumer.seek(new TopicPartition("TopicA", part_offset.part), part_offset.offset);
 records=consumer.pool(Duration.ofSeconds(1)); // I read somewhere kafka is lazy , so should I poll before this ?
 for ( record : records) {
  //do something
  i--;
 }

}

})

但上面的代码不起作用,它只是挂在那里什么也不做。只是想知道用给定的分区id和偏移量信息重放一些消息的最佳方法是什么?或者,我的消费方式不对吗?请告知
谢谢

qeeaahzv

qeeaahzv1#

对不起,我的错。在轮询时,我没有为“无现有偏移量”设置阻止符。所以下面的代码在我设置了重试次数的上限之后运行得很好,可以随意发表评论了,但是我想知道使用int来停止这样的重试是否是最好的做法?谢谢

props.put("enable.auto.commit", "false");
        props.put("max.poll.records", "1");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "appname-message-patching");

        List<Pair<Integer,Integer>> partition_offset= Arrays.asList(Pair.of(0,1),Pair.of(0,21),
                Pair.of(1,31) );

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        partition_offset.forEach(po-> {
            TopicPartition tp=new TopicPartition(topicName, po.getKey());
            consumer.assign(Arrays.asList(tp));
            consumer.seek(tp,po.getValue());
            int counter=1;//only get 1 message , double guards for max.poll.records ?
            int retry=5;//retry 5 times then give up if cannot receive anything
            while (counter>0 ) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

                if(retry>0 || !records.isEmpty()) {
                    for (ConsumerRecord<String, String> record : records) {
                        counter--;
                        System.out.printf("==================thread name = %s , partation = %d , offset = %d , key = %s , value = %s\n", Thread.currentThread().getName(), record.partition(), record.offset(), record.key(), record.value());
                    }
                    retry--;
                }
                if (retry==0||counter==0){
                    counter=0;break;
                }
            }
        });
        consumer.close();

相关问题