segmentio/kafka-go阅读器客户端未订阅主题和分区

46scxncf  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(201)

读取器客户端未开始使用消息。这是间歇性发生的,在大多数情况下,它发生在主题中没有消息时。

Kafka版本

Apache Kafka 3.3.0

Kafka版

v0.4.38

再现行为的资源:
编码:

func main() {

    topic_name := "dev-billing"
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        sig := <-signals
        fmt.Println("Got signal: ", sig)
        cancel()
    }()

    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers:                []string{"0.0.0.0:9092"},
        GroupID:                "consumer-group-biller",
        GroupTopics:            []string{},
        Topic:                  topic_name,
        QueueCapacity:          10,
        MinBytes:               10e3,
        MaxBytes:               10e6,
        MaxWait:                3 * time.Second,
        PartitionWatchInterval: 5 * time.Second,
        WatchPartitionChanges:  true,
        StartOffset:            kafka.LastOffset,
        ReadBackoffMax:         10 * time.Second,
        Logger:                 log.Default(),
        OffsetOutOfRangeError:  true,
    })

    i := 0

    // listening for the interrupts in a different channel.
    defer func() {
        err := r.Close()
        if err != nil {
            fmt.Println("Error closing consumer: ", err)
            return
        }
        fmt.Println("Consumer closed")
    }()

    for {
        m, err := r.FetchMessage(ctx)
        if err != nil {
            break
        }
        msg := m.Value
        content := Event{}
        json.Unmarshal([]byte(msg), &content)

        fmt.Printf("%+v\n", content)

        if content.StatusCode == 200 {
            i++
        }

        if err := r.CommitMessages(ctx, m); err != nil {
            log.Fatal("failed to commit messages:", err)
        }
        fmt.Println("Total:", i)
    }

    if err := r.Close(); err != nil {
        log.Fatal("failed to close reader:", err)
    }
}

预期行为

使用者应该在最后一个偏移量启动后立即开始使用主题分区中的消息。

观察到的行为

当产生器已在产生时,用户无法订阅主题。如果用户在产生器之前启动,则它会运作。
错误记录档:

2022/11/17 14:25:34 entering loop for consumer group, consumer-group-biller
2022/11/17 14:25:37 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 joinGroup succeeded for response, consumer-group-biller.  generationID=44, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff
2022/11/17 14:25:37 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 received empty assignments for group, consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff for generation 44
2022/11/17 14:25:37 sync group finished for group, consumer-group-biller
2022/11/17 14:25:37 subscribed to topics and partitions: map[]
2022/11/17 14:25:37 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:25:37 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:25:37 started commit for group consumer-group-biller

当它起作用时:

2022/11/17 14:09:02 entering loop for consumer group, consumer-group-biller
2022/11/17 14:09:04 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 joinGroup succeeded for response, consumer-group-biller.  generationID=35, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273
2022/11/17 14:09:04 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 sync group finished for group, consumer-group-biller
2022/11/17 14:09:04 subscribed to topics and partitions: map[{topic:dev-billing partition:0}:25]
2022/11/17 14:09:04 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:09:04 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:09:04 initializing kafka reader for partition 0 of dev-billing starting at offset 25
2022/11/17 14:09:04 started commit for group consumer-group-biller
2022/11/17 14:09:04 the kafka reader for partition 0 of dev-billing is seeking to offset 25
{RequestID:f9f1971f-3d5b-4ef5-b92b-880fe094887e EventID:0d52de85-8da4-435d-8d19-e267947670c3 Event:example MerchantID:id Status:OK StatusCode:200}
2022/11/17 14:09:04 committed offsets for group consumer-group-biller:

参考:https://github.com/segmentio/kafka-go

0tdrvxhp

0tdrvxhp1#

最近库出现了问题,我建议暂时降级并在github中报告该问题。
具体来说,您可以降级到v0.4.35v0.4.36中引入了一些对使用者组的重构,如果您查看问题页面,会导致issues成为使用者组。

相关问题