可用的库是sarama(或其扩展sarama集群),但是没有提供消费者组示例,sarama和sarama集群中都没有。我不懂api。我可以举一个为某个主题创建消费者组的例子吗?
vwkv1x7d1#
使用者组由集群使用者“构造函数”的第二个参数指定。下面是一个非常基本的草图:
import ( "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" ) conf := cluster.NewConfig() // add config values brokers := []string{"kafka-1:9092", "kafka-2:9092"} group := "Your-Consumer-Group" topics := []string{"topicName"} consumer := cluster.NewConsumer(broker, group, topics, conf)
因此,您将拥有一个属于指定消费组的消费者。
xqnpmsa82#
不需要使用sarama集群库。对于apachekafka集成,不推荐使用它。 Sarama 原始库本身提供了一种使用使用者组连接到kafka集群的方法。我们需要创建客户机,然后初始化消费组,在其中创建声明并等待消息通道接收消息。正在初始化客户端:-
Sarama
kfversion, err := sarama.ParseKafkaVersion(kafkaVersion) // kafkaVersion is the version of kafka server like 0.11.0.2 if err != nil { log.Println(err) } config := sarama.NewConfig() config.Version = kfversion config.Consumer.Return.Errors = true // Start with a client client, err := sarama.NewClient([]string{brokerAddr}, config) if err != nil { log.Println(err) } defer func() { _ = client.Close() }()
与消费群体的联系:-
// Start a new consumer group group, err := sarama.NewConsumerGroupFromClient(consumer_group, client) if err != nil { log.Println(err) } defer func() { _ = group.Close() }()
开始使用来自主题分区的消息:-
// Iterate over consumer sessions. ctx := context.Background() for { topics := []string{topicName} handler := &Message{} err := group.Consume(ctx, topics, handler) if err != nil { log.Println(err) } }
最后一部分是等待消息通道消费消息。我们需要实现所有的功能(三)来实现 ConsumerGroupHandler 接口。
ConsumerGroupHandler
func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil } func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil } func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error { for msg := range claim.Messages() { fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset) sess.MarkMessage(msg, "") } return nil }
更多关于Kafka使用戈朗的信息,请查看萨拉马图书馆。
2条答案
按热度按时间vwkv1x7d1#
使用者组由集群使用者“构造函数”的第二个参数指定。下面是一个非常基本的草图:
因此,您将拥有一个属于指定消费组的消费者。
xqnpmsa82#
不需要使用sarama集群库。对于apachekafka集成,不推荐使用它。
Sarama
原始库本身提供了一种使用使用者组连接到kafka集群的方法。我们需要创建客户机,然后初始化消费组,在其中创建声明并等待消息通道接收消息。
正在初始化客户端:-
与消费群体的联系:-
开始使用来自主题分区的消息:-
最后一部分是等待消息通道消费消息。我们需要实现所有的功能(三)来实现
ConsumerGroupHandler
接口。更多关于Kafka使用戈朗的信息,请查看萨拉马图书馆。