有没有可能将偏移量提交到具有多个分区的kafka主题,以便将偏移量1提交到分区1,将偏移量2提交到p2,依此类推?
编辑:
是的,有可能:
consumer = KafkaConsumer()
topicpartitions = [TopicPartition('topicname', partitionId) for partitionId in consumer.partitions_for_topic('topicname')]
consumer.assign(topicpartitions)
for tp in topicpartitions:
consumer.commit({tp: OffsetAndMetadata(1000, None)})
for msg in consumer:
#do whatever
1条答案
按热度按时间cclgggtu1#
Kafka偏移量总是每个分区。我的意思是,如果您的主题有2个分区,那么p0中的消息将从偏移量0开始,并为每个新消息增加my 1。p1相似性中的消息从偏移量0开始并增加1。
因此,如果您发布了两条消息(没有密钥),一条将进入偏移量为0的分区0,另一条将进入偏移量为0的分区1。
现在,如果另一个应用程序正在使用此主题并提交其偏移量,那么它将在
__consumer_offsets
主题,包括其group.id、主题、分区号和偏移量。例如,{“myconsumerid”,“mytopic”,p0,1}和{“myconsumerid”,“mytopic”,p1,1}。如果应用程序停止,并且一个或两个其他使用者以相同的group.id启动,则它们将从为其分配分区的最后一个提交的偏移量开始继续。
如果要将组偏移重新定位到任何其他位置,可以使用0.11 kafka工具更改组的提交偏移
bin/kafka-consumer-groups.sh—重置偏移量
如果您给每个分区指定了正确的标志,这个工具将允许您独立地设置它的偏移量。
如果愿意,可以从python程序中调用此工具。应该首先关闭消费组中的所有现有消费者,否则他们可能会重写偏移量。
如果要编写此工具的python版本,而不是运行现有的cli命令,则需要找到支持seek()的python客户端,这样就可以将偏移量更改为所需的偏移量,然后在消费应用程序重新启动时将其提交到该位置。另一种方法是放弃动态分区分配,手动分配()要更改的分区,并将偏移提交到分配的列表中。不能在同一应用程序中同时使用动态管理的分区订阅和手动分配的分区。
您还需要确保在这些分区上使用相同使用者组的所有其他使用者都已关闭,或者一旦其他使用者自动提交或手动提交其对您刚刚设置的偏移量的偏移量,提交的偏移量就会被其他使用者覆盖。