经过长时间的搜索,我找到了Kafka库,它足够简单,我可以理解它。:)https://github.com/ThreeDotsLabs/watermill/blob/master/_examples/pubsubs/kafka/main.go
我的问题是我如何创建多个订阅者阅读特定主题?我试着修改上面的例子
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/IBM/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubConfig := kafka.DefaultSaramaSubscriberConfig()
saramaSubConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
brokers := []string{"127.0.0.1:9092", "127.0.0.1:9093"}
subscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{
Brokers: brokers,
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubConfig,
ConsumerGroup: "TestGroup",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "test")
if err != nil {
panic(err)
}
go process(messages, "one")
messages2, err := subscriber.Subscribe(context.Background(), "test")
if err != nil {
panic(err)
}
go process(messages2, "two")
pubisher, err := kafka.NewPublisher(kafka.PublisherConfig{
Brokers: brokers,
Marshaler: kafka.DefaultMarshaler{},
}, watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessage(pubisher)
}
func publishMessage(publisher message.Publisher) {
counter := 0
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(fmt.Sprintf("Hello world %d", counter)))
if err := publisher.Publish("test", msg); err != nil {
panic(err)
}
counter++
time.Sleep(1 * time.Millisecond)
}
}
func process(messages <-chan *message.Message, goroutineId string) {
for msg := range messages {
log.Printf("goroutine %s recived message: %s, payload: %s", goroutineId, msg.UUID, string(msg.Payload))
msg.Ack()
}
}
但似乎只有接收到“2”的go例程总是处理该消息。
3条答案
按热度按时间ni65a41a1#
在同一个消费者组中有两个消费者。
如果您的测试主题只有一个分区,那么组中只能有一个活动的消费者。
要让两个使用者处理相同的事件,您需要创建具有唯一组的订阅者
ix0qys7i2#
这是提示:)我已经为test2主题创建了两个分区
他们在这里说:https://watermill.io/pubsubs/kafka/#partitioning
所以我定义了
这里是输出:
这里的“问题”是只有一个go例程(goroutine one)从这个主题读取数据。我这次做错了什么?:)
8wigbo563#
我有两个分区。
Kafka将主题的分区分配给消费者组中的消费者,因此每个分区只由消费者组中的一个消费者消费。Kafka保证一条消息只会被消费者组中的一个消费者读取。
https://medium.com/javarevisited/kafka-partitions-and-consumer-groups-in-6-mins-9e0e336c6c00
当消费者组中有多个消费者以便他们可以从主题中读取数据时,这不就是增加吞吐量的全部想法吗?
编辑:
我想我知道问题在哪里。https://kafka.apache.org/documentation.html
具有相同事件密钥(例如,客户或车辆ID)的事件被写入相同分区,
为什么会这样:
将所有事件发送到同一个分区?:
是不是这个领域:
负责事件密钥?
编辑2::)没关系!它工作正常,我误解了一个概念。:)