我在kafka broker中有一个主题有3个分区,1个分区有消息,2个分区是空的,我如何知道在一个调用中使用哪个分区?
首先我分配一个分区等于的topicpartition kafka.PartitionAny
,但该值始终返回-1
所以我必须手动使用一个计数器,当我成功地从一个分区使用空消息时,那么 count++
从下一个开始,直到我找到信息
for{
partitions = append(partitions, kafka.TopicPartition{
Topic: &topic,
Partition: partition,
Offset: offSet,
Error: err,
})
err = c.Assign(partitions)
if err != nil {
return err
}
// retrieve message
ev, err := c.Poll(-1)
if err != nil {
return err
}
// if no message, check the next partition
if ev == nil{
partition++
}else{
break
}
}
前两轮不返回任何消息,但必须等待第三轮返回,有没有办法自动检测哪个分区存储了未使用的消息?
如果没有别的办法,Kafka能帮我做循环路线吗?否则我就得自己记录计数器
谢谢!:)
1条答案
按热度按时间vatpfxk51#
一定要使用subscribe()方法,然后调用poll()。如果有可用的记录,您将得到一个包含一个或多个记录(与不同分区关联)的记录答案。
处理完记录后,您可以手动提交(如果您使用
enable.auto.commit = false
)使用记录中包含的medata(主题、分区、偏移量等…)。扬尼克