kafka:如何使用基于时间戳的数据

4bbkushb  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(407)

我想知道除了偏移量之外,是否还有其他方法来获取关于时间间隔的数据?我想把昨天的所有日期都消耗掉,我该怎么做?

xu3bshqb

xu3bshqb1#

使用offsetsfortimes获得与所需时间戳相关的正确偏移量。在python中,它将类似于下一个:

from datetime import datetime
from kafka import KafkaConsumer, TopicPartition

topic  = "www.kilskil.com" 
broker = "localhost:9092"

# lets check messages of the first day in New Year

date_in  = datetime(2019,1,1)
date_out = datetime(2019,1,2)

consumer = KafkaConsumer(topic, bootstrap_servers=broker, enable_auto_commit=True)
consumer.poll()  # we need to read message or call dumb poll before seeking the right position

tp      = TopicPartition(topic, 0) # partition n. 0

# in simple case without any special kafka configuration there is only one partition for each topic channel

# and it's number is 0

# in fact you asked about how to use 2 methods: offsets_for_times() and seek()

rec_in  = consumer.offsets_for_times({tp:date_in.timestamp() * 1000})
rec_out = consumer.offsets_for_times({tp:date_out.timestamp() * 1000})

consumer.seek(tp, rec_in[tp].offset) # lets go to the first message in New Year!

c = 0
for msg in consumer:
  if msg.offset >= rec_out[tp].offset:
    break

  c += 1
  # message also has .timestamp field

print("{c} messages between {_in} and {_out}".format(c=c, _in=str(date_in), _out=str(date_out)))

别忘了,kafka以毫秒为单位度量时间戳,它有long类型。python lib datetime以秒为单位返回时间戳,因此我们需要将其乘以1000。方法 offsets_for_times 返回带有 TopicPartition 钥匙和 OffsetAndTimestamp 价值观。

xytpbqjk

xytpbqjk2#

您可以找到指定时间间隔开始处的最早偏移量,并回放到此偏移量。然而,很难理解时间间隔的终点在哪里,因为具有最早时间戳的记录可能稍后到达。因此,您可以从间隔的开始使用记录,直到找到时间戳晚于结束时间的记录,再加上一些记录来捕获延迟的消息。
倒带到开始时间的代码是:

public void rewind(DateTime time) {
    Set<TopicPartition> assignments = consumer.assignment();
    Map<TopicPartition, Long> query = new HashMap<>();
    for (TopicPartition topicPartition : assignments) {
        query.put(topicPartition, time.getMillis());
    }
    Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);

    result.entrySet().stream().forEach(entry -> consumer.seek(entry.getKey(),
            Optional.ofNullable(entry.getValue()).map(OffsetAndTimestamp::offset).orElse(new Long(0))));
}

相关问题