包无法从分区中使用

bnl4lu3b  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(229)

我不能从一个主题中消费,我不确定我的代码或Kafka配置是否有问题。我遇到的问题是,它被卡在print语句“starts”上,因此无法从通道<-partitionconsumer.messages()接收消息。
这些是我为我的Kafka设置采取的步骤(https://kafka.apache.org/quickstart)我确信它们确实存在,因为当我运行下面的命令时,我看到了值。
bin/kafka-console-consumer.sh--分区0--主题测试--引导服务器localhost:9092 --offset 最早
bin/zookeeper-server-start.sh配置/zookeeper.properties
bin/kafka-server-start.sh配置/server.properties
bin/kafka-topics.sh--创建--缩放器localhost:2181 --replication-factor 1--分区1--主题测试
bin/kafka-console-producer.sh—代理列表localhost:9092 --topic 测试

func RetrieveConsumed() (int){
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        fmt.Println("ERROR")
        panic(err)
    }

    defer func() {
        if err := consumer.Close(); err != nil {
            fmt.Println("ERROR")
        }
    }()

    partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
    if err != nil {
        fmt.Println("ERROR")
        panic(err)
    }
    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            fmt.Println("ERROR")
        }
    }()

    // Trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    consumed := 0
    ConsumerLoop:
        for {
            fmt.Println("Starts")
            select {
            case msg := <-partitionConsumer.Messages():
                fmt.Printf("Consumed message offset %d\n", msg.Offset)
                consumed++
            case <- partitionConsumer.Errors():
                break ConsumerLoop
            case <-signals:
                break ConsumerLoop
            }
        }

        fmt.Printf("Consumed: %d\n", consumed)
        return consumed
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题