from datetime import datetime
from kafka import KafkaConsumer, TopicPartition
topic = "www.kilskil.com"
broker = "localhost:9092"
# lets check messages of the first day in New Year
date_in = datetime(2019,1,1)
date_out = datetime(2019,1,2)
consumer = KafkaConsumer(topic, bootstrap_servers=broker, enable_auto_commit=True)
consumer.poll() # we need to read message or call dumb poll before seeking the right position
tp = TopicPartition(topic, 0) # partition n. 0
# in simple case without any special kafka configuration there is only one partition for each topic channel
# and it's number is 0
# in fact you asked about how to use 2 methods: offsets_for_times() and seek()
rec_in = consumer.offsets_for_times({tp:date_in.timestamp() * 1000})
rec_out = consumer.offsets_for_times({tp:date_out.timestamp() * 1000})
consumer.seek(tp, rec_in[tp].offset) # lets go to the first message in New Year!
c = 0
for msg in consumer:
if msg.offset >= rec_out[tp].offset:
break
c += 1
# message also has .timestamp field
print("{c} messages between {_in} and {_out}".format(c=c, _in=str(date_in), _out=str(date_out)))
2条答案
按热度按时间xu3bshqb1#
使用offsetsfortimes获得与所需时间戳相关的正确偏移量。在python中,它将类似于下一个:
别忘了,kafka以毫秒为单位度量时间戳,它有long类型。python lib datetime以秒为单位返回时间戳,因此我们需要将其乘以1000。方法
offsets_for_times
返回带有TopicPartition
钥匙和OffsetAndTimestamp
价值观。xytpbqjk2#
您可以找到指定时间间隔开始处的最早偏移量,并回放到此偏移量。然而,很难理解时间间隔的终点在哪里,因为具有最早时间戳的记录可能稍后到达。因此,您可以从间隔的开始使用记录,直到找到时间戳晚于结束时间的记录,再加上一些记录来捕获延迟的消息。
倒带到开始时间的代码是: