按日期列出的apache kafka消费者

zrfyljdw  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(389)

Kafka0.8.2是低层次还是高层次的消费者,可以在特定的时间段/日期消费Kafka客户,spark kafka,…)

yqhsw0fo

yqhsw0fo1#

使用偏移量和时间戳:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            Set<TopicPartition> assignments = consumer.assignment();
            Map<TopicPartition, Long> query = new HashMap<>();
            for (TopicPartition topicPartition : assignments) {
                query.put(topicPartition, date);
            }

            Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);

            result.entrySet().stream().forEach(entry -> consumer.seek(entry.getKey(),
                    Optional.ofNullable(entry.getValue()).map(OffsetAndTimestamp::offset).orElse(new Long(0))));

            flag = false;

        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
vcirk6k6

vcirk6k62#

不,没有。必须转到0.10才能获得这种功能。

相关问题