获取kafka分区在时间戳之后的偏移量

wpcxdonn  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(711)

我试着从一个开始的时间到一个结束的时间阅读Kafka的主题,在这个时间间隔之外多读几条信息是可以的,但我肯定要处理这个时间间隔内的所有信息。我检查了simple consumer并找到getoffsetbefore(),它将在开始时间之前提供偏移量。但是我不知道如何在结束时间之后获得每个分区的偏移量。请帮帮我!

ymdaylpp

ymdaylpp1#

Kafka消费api自0.10.1版起提供

/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
 * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
 *
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
 * will be returned for that partition.
 *
 * Notice that this method may block indefinitely if the partition does not exist.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
 *         such message.
 * @throws IllegalArgumentException if the target timestamp is negative.
 */
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
    for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
        // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
        // OffsetAndTimestamp is always positive.
        if (entry.getValue() < 0)
            throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
                    entry.getValue() + ". The target time cannot be negative.");
    }
    return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}
ltqd579y

ltqd579y2#

没有人能保证结束时间,因为没有人能预见未来。
假设您知道起始偏移量,并将所有数据读取到主题末尾。可能还有一个制作人,他写了一个时间戳属于你的记录。。。
注意,kafka的记录时间戳是元数据,因此,任何记录都可以有任何时间戳。代理不会以任何方式解释此时间戳(只有streams api会这样做)。因此,kafka代理只保证基于偏移量的消息排序,而不保证基于时间戳的排序。如果记录不按时间排序,即偏移量较大的记录的时间戳比偏移量较小的记录的时间戳小,则该记录是所谓的“延迟记录”(与时间有关),并且延迟没有上限。
您只能在业务逻辑中决定要阅读的内容。因此,给定起始偏移量,您只需消费消息,同时监视时间戳。然后,您可以在看到时间戳大于间隔的第一条记录时停止处理—这将是最严格的处理,并且不允许任何延迟到达的记录。“遗漏”某些数据的概率相对较高。
或者应用限制性较小的上限,并一直读取,直到看到时间戳大于的记录为止 interval upper bound + XX 作为您选择的配置参数。同样大 X 因为你错过任何记录的可能性都很小。

相关问题