kafka使用者赋值返回空集

rpppsulh  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(427)

我已经订阅了Kafka主题如下。我只需要在消费者被分配了分区之后运行一些逻辑。
然而 consumer.assignment() 不管我等多久,我都会像一个空壳回来。如果我没有while循环 consumer.poll() 我确实从这个主题上得到了记录。有人能告诉我为什么会这样吗?

consumer.subscribe(topics);
    Set<TopicPartition> assigned=Collections.emptySet();
    while(isAssigned) {
          assigned = consumer.assignment();
          if(!assigned.isEmpty()) {
              isAssigned= false;
          }
    }

//consumer props
Properties props = new Properties();
        props.put("bootstrap.servers", "xxx:9092,yyy:9092");
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://xxx:8081");
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("max.poll.records", "100");
0sgqnhkj

0sgqnhkj1#

直到你打电话来 poll() ,消费者只是在闲逛。
只有在 poll() 调用时,它将启动到集群的连接,获取分配的分区并尝试获取消息。
javadoc中提到了这一点:
订阅一组主题后,消费者将在调用poll(duration)时自动加入组。
一旦你开始打电话 poll() ,您可以使用 assignment() 甚至注册一个 ConsumerRebalanceListener 在从使用者示例分配或撤销分区时通知。

相关问题