如何在golang kafka 10中获得groupid?

xvw2m8pv  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(491)

我正在使用Kafka10.0和https://github.com/shopify/sarama. 我正在尝试获取消费者处理的最新消息的偏移量。
为此,我找到了newoffsetmanagerfromclient(group string,client client)方法,它需要组名。
如何获取消费者组名称?

offsets := make(map[int32]int64)

config := sarama.NewConfig()
config.Consumer.Offsets.CommitInterval = 200 * time.Millisecond
config.Version = sarama.V0_10_0_0

// config.Consumer.Offsets.Initial = sarama.OffsetNewest
cli, _ := sarama.NewClient(kafkaHost, config)
defer cli.Close()

offsetManager, _ := sarama.NewOffsetManagerFromClient(group, cli)
for _, partition := range partitions {
    partitionOffsetManager, _ := offsetManager.ManagePartition(topic, partition)
    offset, _ := partitionOffsetManager.NextOffset()

    offsets[partition] = offset
}
return offsets

我创建了一个

consumer := sarama.NewConsumer(connections, config)

但我不知道如何创建一个消费者组并获取其组名。

mlnl4t2r

mlnl4t2r1#

我想你可以用任何字符串作为groupid。请看萨拉玛·戈多克的例子

// Start a new consumer group
group, err := NewConsumerGroupFromClient("my-group", client)
if err != nil {
    panic(err)
}
defer func() { _ = group.Close() }()

也许你可以给它任何线。您应该确保其他消费者可以获得相同的groupid来加入该组。

11dmarpk

11dmarpk2#

您正在尝试创建自己的偏移管理器以查找当前偏移:

offsetManager, _ := sarama.NewOffsetManagerFromClient(group, cli)

类似地,使用主题消息的使用者必须使用相同的偏移量管理器,并且他们必须使用特定的组id。请使用该组id。

相关问题