Go的Kafka库Watermill和多个订阅者

iklwldmw  于 2023-09-28  发布在  Go
关注(0)|答案(3)|浏览(151)

经过长时间的搜索,我找到了Kafka库,它足够简单,我可以理解它。:)https://github.com/ThreeDotsLabs/watermill/blob/master/_examples/pubsubs/kafka/main.go
我的问题是我如何创建多个订阅者阅读特定主题?我试着修改上面的例子

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/IBM/sarama"
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    saramaSubConfig := kafka.DefaultSaramaSubscriberConfig()
    saramaSubConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
    brokers := []string{"127.0.0.1:9092", "127.0.0.1:9093"}
    subscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{
        Brokers:               brokers,
        Unmarshaler:           kafka.DefaultMarshaler{},
        OverwriteSaramaConfig: saramaSubConfig,
        ConsumerGroup:         "TestGroup",
    },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
    messages, err := subscriber.Subscribe(context.Background(), "test")

    if err != nil {
        panic(err)
    }
    go process(messages, "one")
    messages2, err := subscriber.Subscribe(context.Background(), "test")
    if err != nil {
        panic(err)
    }
    go process(messages2, "two")

    pubisher, err := kafka.NewPublisher(kafka.PublisherConfig{
        Brokers:   brokers,
        Marshaler: kafka.DefaultMarshaler{},
    }, watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
    publishMessage(pubisher)

}

func publishMessage(publisher message.Publisher) {
    counter := 0
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte(fmt.Sprintf("Hello world %d", counter)))
        if err := publisher.Publish("test", msg); err != nil {
            panic(err)
        }
        counter++
        time.Sleep(1 * time.Millisecond)
    }
}

func process(messages <-chan *message.Message, goroutineId string) {
    for msg := range messages {
        log.Printf("goroutine %s recived message: %s, payload: %s", goroutineId, msg.UUID, string(msg.Payload))
        msg.Ack()
    }
}

但似乎只有接收到“2”的go例程总是处理该消息。

ni65a41a

ni65a41a1#

在同一个消费者组中有两个消费者。
如果您的测试主题只有一个分区,那么组中只能有一个活动的消费者。
要让两个使用者处理相同的事件,您需要创建具有唯一组的订阅者

ix0qys7i

ix0qys7i2#

这是提示:)我已经为test2主题创建了两个分区

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    // "github.com/IBM/sarama"

    "github.com/IBM/sarama"
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
    "github.com/ThreeDotsLabs/watermill/message"
)

const (
    topic = "test2"
)

func main() {
    saramaSubConfig := kafka.DefaultSaramaSubscriberConfig()
    saramaSubConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

    marshaler := kafka.NewWithPartitioningMarshaler(func(topic string, msg *message.Message) (string, error) {
        return msg.Metadata.Get("partition"), nil
    })

    brokers := []string{"127.0.0.1:9092", "127.0.0.1:9092"}

    publisherConfig := kafka.PublisherConfig{
        Brokers:   brokers,
        Marshaler: marshaler,
    }

    publisher, err := kafka.NewPublisher(publisherConfig,
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
    subConfig := kafka.SubscriberConfig{
        Brokers:               brokers,
        OverwriteSaramaConfig: saramaSubConfig,
        ConsumerGroup:         "TestApp",
        Unmarshaler:           marshaler,
    }

    subscriber1, err := kafka.NewSubscriber(subConfig, watermill.NewStdLogger(false, false))
    if err != nil {
        panic(err)
    }
    messages1, err := subscriber1.Subscribe(context.Background(), topic)
    if err != nil {
        panic(err)
    }
    go process(messages1, "one")

    subscriber2, err := kafka.NewSubscriber(subConfig, watermill.NewStdLogger(false, false))
    if err != nil {
        panic(err)
    }
    messages2, err := subscriber2.Subscribe(context.Background(), topic)
    if err != nil {
        panic(err)
    }
    go process(messages2, "two")

    publish(publisher)

}

func publish(publisher *kafka.Publisher) {
    counter := 0
    for {
        msg := message.NewMessage(watermill.NewShortUUID(), []byte(fmt.Sprintf("Hello, world! %d", counter)))
        if counter%2 == 0 {
            msg.Metadata.Set("partition", "one")
        } else {
            msg.Metadata.Set("partition", "two")
        }
        err := publisher.Publish(topic, msg)
        if err != nil {
            panic(err)
        }
        counter++
        time.Sleep(500 * time.Millisecond)
    }
}

func process(messages <-chan *message.Message, id string) {
    for msg := range messages {
        log.Printf("go routine: %s, received message: %s, payload: %s form partition: %s", id, msg.UUID, string(msg.Payload), msg.Metadata.Get("partition"))

        // we need to Acknowledge that we received and processed the message,
        // otherwise, it will be resent over and over again.
        msg.Ack()
        // msg.Ack()
    }
}

他们在这里说:https://watermill.io/pubsubs/kafka/#partitioning
所以我定义了

marshaler := kafka.NewWithPartitioningMarshaler(func(topic string, msg *message.Message) (string, error) {
        return msg.Metadata.Get("partition"), nil
    })

这里是输出:

[watermill] 2023/08/20 15:27:31.490408 subscriber.go:152:       level=INFO  msg="Subscribing to Kafka topic" consumer_group=TestApp kafka_consumer_uuid=wUasYcSKGdHSGwGcMxnjDd provider=kafka subscriber_uuid=DDb4fnxngDBj3UTdLgVMxX topic=test2
[watermill] 2023/08/20 15:27:31.491460 subscriber.go:223:       level=INFO  msg="Starting consuming" consumer_group=TestApp kafka_consumer_uuid=wUasYcSKGdHSGwGcMxnjDd provider=kafka subscriber_uuid=DDb4fnxngDBj3UTdLgVMxX topic=test2
[watermill] 2023/08/20 15:27:31.494804 subscriber.go:152:       level=INFO  msg="Subscribing to Kafka topic" consumer_group=TestApp kafka_consumer_uuid=YGPfva2A6DGzLGbTvei6Aa provider=kafka subscriber_uuid=Uza7oyu6hNJLbhcdfgqx8Z topic=test2
[watermill] 2023/08/20 15:27:31.494804 subscriber.go:223:       level=INFO  msg="Starting consuming" consumer_group=TestApp kafka_consumer_uuid=YGPfva2A6DGzLGbTvei6Aa provider=kafka subscriber_uuid=Uza7oyu6hNJLbhcdfgqx8Z topic=test2
2023/08/20 15:27:31 go routine: two, received message: 3JtHAsycuRHLkKAzCcp36A, payload: Hello, world! 0 form partition: one
2023/08/20 15:27:32 go routine: two, received message: hUK43GezS7ZAw4McEfwKcC, payload: Hello, world! 1 form partition: two
2023/08/20 15:27:32 go routine: two, received message: Vnwd8ZoSTemAZRPyzMXDVa, payload: Hello, world! 2 form partition: one
2023/08/20 15:27:33 go routine: two, received message: TSAKqQqTX2zMkxiszZdrqT, payload: Hello, world! 3 form partition: two
2023/08/20 15:27:33 go routine: two, received message: Kc4r9YxWFgsYeiHKjohHnG, payload: Hello, world! 4 form partition: one
2023/08/20 15:27:34 go routine: two, received message: zyZDt6UX5SaESbjoi4SV98, payload: Hello, world! 5 form partition: two
2023/08/20 15:27:34 go routine: two, received message: 97CnL9NHNMDcYjnmAW2JNn, payload: Hello, world! 6 form partition: one
2023/08/20 15:27:35 go routine: two, received message: 6pydPz6gc2svodLkUJKeWc, payload: Hello, world! 7 form partition: two
2023/08/20 15:27:35 go routine: two, received message: K7oi8U3wGgJENXKpvGZCuA, payload: Hello, world! 8 form partition: one
2023/08/20 15:27:36 go routine: two, received message: KQjEvP2WN8K8TUXetphBzE, payload: Hello, world! 9 form partition: two
2023/08/20 15:27:36 go routine: two, received message: vtcNPZ9dRXDPwLAkvZx4nD, payload: Hello, world! 10 form partition: one
2023/08/20 15:27:37 go routine: two, received message: yxzdCmdSBNQCUWLhjN3BAE, payload: Hello, world! 11 form partition: two
2023/08/20 15:27:37 go routine: two, received message: kYBfwbyJ5kXQtuomYoFCRR, payload: Hello, world! 12 form partition: one
2023/08/20 15:27:38 go routine: two, received message: WcZBALBxCoa259rbzKGpJW, payload: Hello, world! 13 form partition: two
2023/08/20 15:27:38 go routine: two, received message: RbAz5C2vTXzucgKrXnN3Kf, payload: Hello, world! 14 form partition: one
2023/08/20 15:27:39 go routine: two, received message: o9MKozpz9QmjZBFnCeCJcX, payload: Hello, world! 15 form partition: two
2023/08/20 15:27:39 go routine: two, received message: EwRgqDEm5DNwsML76bjLT6, payload: Hello, world! 16 form partition: one
2023/08/20 15:27:40 go routine: two, received message: 9RDMHbGimTQjpFmKkUifz3, payload: Hello, world! 17 form partition: two

这里的“问题”是只有一个go例程(goroutine one)从这个主题读取数据。我这次做错了什么?:)

8wigbo56

8wigbo563#

我有两个分区。

Topic: test2    TopicId: GEmDDeZ5QwerCsY-oRsi3w PartitionCount: 2       ReplicationFactor: 1    Configs:
        Topic: test2    Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2    Partition: 1    Leader: 0       Replicas: 0     Isr: 0

Kafka将主题的分区分配给消费者组中的消费者,因此每个分区只由消费者组中的一个消费者消费。Kafka保证一条消息只会被消费者组中的一个消费者读取。
https://medium.com/javarevisited/kafka-partitions-and-consumer-groups-in-6-mins-9e0e336c6c00
当消费者组中有多个消费者以便他们可以从主题中读取数据时,这不就是增加吞吐量的全部想法吗?
编辑:
我想我知道问题在哪里。https://kafka.apache.org/documentation.html
具有相同事件密钥(例如,客户或车辆ID)的事件被写入相同分区,
为什么会这样:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/IBM/sarama"
)

const (
    topic = "test2"
)

func main() {
    brokers := []string{"localhost:9092", "localhost:9093"}
    cfg := sarama.NewConfig()
    cfg.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer(brokers, cfg)
    if err != nil {
        log.Fatal(err)
    }
    counter := 0
    for {
        msg := sarama.ProducerMessage{
            Topic:     topic,
            Partition: -1,
            Value:     sarama.StringEncoder(fmt.Sprintf("Hello world %d", counter)),
        }
        if counter%2 == 0 {
            msg.Key = sarama.StringEncoder("two")
        } else {
            msg.Key = sarama.StringEncoder("one")
        }

        partition, offset, err := producer.SendMessage(&msg)
        if err != nil {
            log.Print("Failed to sent message", err)
        }
        log.Printf("Sent message to partiotion %d with offset %d key %s", partition, offset, msg.Key)
        counter++
        time.Sleep(10 * time.Millisecond)
    }

}

将所有事件发送到同一个分区?:

2023/08/21 09:04:37 Sent message to partiotion 1 with offset 7882 key two
2023/08/21 09:04:37 Sent message to partiotion 1 with offset 7883 key one
2023/08/21 09:04:37 Sent message to partiotion 1 with offset 7884 key two
2023/08/21 09:04:37 Sent message to partiotion 1 with offset 7885 key one
2023/08/21 09:04:37 Sent message to partiotion 1 with offset 7886 key two
2023/08/21 09:04:37 Sent message to partiotion 1 with offset 7887 key one
2023/08/21 09:04:37 Sent message to partiotion 1 with offset 7888 key two

是不是这个领域:

if counter%2 == 0 {
            msg.Key = sarama.StringEncoder("two")
        } else {
            msg.Key = sarama.StringEncoder("one")
        }

负责事件密钥?
编辑2::)没关系!它工作正常,我误解了一个概念。:)

相关问题