Kafka 哪个Java配置属性帮助我从开头(偏移量)显示主题中的所有记录

velaa5lx  于 2023-03-17  发布在  Apache
关注(0)|答案(1)|浏览(88)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"");

// earliest --> for consuming from nth offset where (n-1)th offset record is the last record consumer consumed before shitting down

// latest --> for consuming the records that are generated after consumer has started to consume

有没有办法将偏移设置为0,然后从头开始消耗?
如果你想检查我的代码这里是我的要点链接:

我试着把这些行放在while循环之前,但是没有用。

consumer.poll(Duration.ZERO); 
consumer.seekToBeginning(consumer.assignment());

我只想写一个代码来完成这个任务

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myjson --from-beginning

如果你不明白我的问题,请在评论区告诉我。

dxxyhpgq

dxxyhpgq1#

private void adjustOffset(KafkaConsumer<byte[], byte[]> consumer) {
    Set<TopicPartition> assignments = consumer.assignment();
    switch (ConfAttr.KAFKA_OFFSET_ADJUST_TYPE) {
        case "0":
            break;
        case "1":
            assignments.forEach(topicPartition ->
                    consumer.seekToBeginning(
                            Arrays.asList(topicPartition)));
            break;
        case "2":
            assignments.forEach(topicPartition ->
                    consumer.seek(
                            topicPartition,
                            Integer.parseInt(ConfAttr.KAFKA_OFFSET_ADJUST_VALUE)));
            break;
        case "3":
            Map<TopicPartition, Long> query = new HashMap<>();
            for (TopicPartition topicPartition : assignments) {
                query.put(
                        topicPartition,
                        Instant.now().minus(Integer.parseInt(ConfAttr.KAFKA_OFFSET_ADJUST_VALUE), MINUTES).toEpochMilli());
            }

            Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);

            result.entrySet()
                    .stream()
                    .forEach(entry ->
                            consumer.seek(
                                    entry.getKey(),
                                    Optional.ofNullable(entry.getValue())
                                            .map(OffsetAndTimestamp::offset)
                                            .orElse(new Long(0))));

    }
    adjustOffsetFlag = false;
}

相关问题