在Kafka如何得到准确的偏移量根据生产时间

e7arh2l6  于 2021-06-07  发布在  Kafka
关注(0)|答案(5)|浏览(392)

我需要在一天中一小时一小时地得到Kafka的信息。每一小时我将启动一个作业来使用1小时前生成的消息。e、 例如,如果当前时间是20:12,我将在19:00:00到19:59:59之间使用消息。这意味着我需要在19:00:00开始偏移,在19:59:59结束偏移。我之前使用了simpleconsumer.getoffsetsb,如“0.8.0 simpleconsumer示例”中所示。问题是返回的偏移量与作为参数给定的时间戳不匹配。e、 g.当生成时间戳19:00:00时,我得到时间16:38:00生成的消息。

gojuced7

gojuced71#

Kafka1.10确实支持时间戳,尽管使用它来做你想做的事情仍然是一个小挑战。但是如果你知道你想从哪个时间戳开始读,直到你想读为止,那么你就可以轮询消息直到那个时间,并且停止消费。

ojsjcaue

ojsjcaue2#

正如其他答复所指出的那样,Kafka的旧版本只有一种将时间Map到偏移量的近似方法。然而,自从kafka 0.10.0(2016年5月发布)以来,kafka为每个主题都保留了一个时间索引。这将允许您有效地从时间到精确的偏移量。您可以使用kafkaconsumer#offsetsfortimes方法访问此信息。
kip-33设计讨论页上有关于如何实现基于时间的索引的更多细节。

hmtdttj4

hmtdttj43#

Kafka消费api方法 getOffsetsByTimes() 可用于此,可从0.10.0或更高版本获得。参见javadoc。

/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
 * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
 *
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
 * will be returned for that partition.
 *
 * Notice that this method may block indefinitely if the partition does not exist.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
 *         such message.
 * @throws IllegalArgumentException if the target timestamp is negative.
 */
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
    for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
        // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
        // OffsetAndTimestamp is always positive.
        if (entry.getValue() < 0)
            throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
                    entry.getValue() + ". The target time cannot be negative.");
    }
    return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}
kmb7vmvb

kmb7vmvb4#

向您展示代码:

public static Map<TopicPartition, OffsetAndTimestamp> getOffsetAndTimestampAtTime(String kafkaServer, String topic, long time) {
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
    kafkaParams.put(GROUP_ID_CONFIG, "consumerGroupId");
    kafkaParams.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(AUTO_OFFSET_RESET_CONFIG, "latest");
    kafkaParams.put(ENABLE_AUTO_COMMIT_CONFIG, false);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaParams);

    List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);

    List<TopicPartition> topicPartitions = partitionInfos
            .stream()
            .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
            .collect(Collectors.toList());

    Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream()
            .collect(Collectors.toMap(tp -> tp, tp -> time));

    Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(topicPartitionToTimestampMap);
    consumer.close();
    return result;
}
qnzebej0

qnzebej05#

在Kafka目前没有办法得到一个偏移量,对应于一个特定的时间戳-这是设计。正如jaykreps的日志文章顶部所描述的,偏移量数字为日志提供了一种时间戳,它与挂钟时间分离。用偏移量作为时间的概念,你就可以知道任何两个系统是否处于一致的状态,只要知道它们读到什么偏移量就行了。从来没有任何关于不同服务器上的不同时钟时间,闰年,白天光节省时间,时区等混淆。它有点不错。。。
现在。。。总之,如果你知道你的服务器在某个时候坏了,那么实际上,你真的很想知道相应的偏移量。你可以靠近。kafka机器上的日志文件是根据它们开始写入的时间命名的,并且存在一个kafka工具(我现在找不到),可以让您知道哪些偏移与这些文件关联。如果你想知道确切的时间戳,那么你必须在发送给Kafka的消息中对时间戳进行编码。

相关问题