Kafka消费者轮询和重新连接

y4ekin9u  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(1249)

我们刚开始在我们的项目中使用Kafka。我们使用的是Kafka2.11-0.9.0.0。我有一些关于Kafka消费者的问题。
1) 在启动zookeeper和kafka服务器之前,我启动了kafka consumer,但我的kafkaconsumer客户端仍然能够连接。我有以下代码行

Consumer<String, String> consumer =  new KafkaConsumer<String,String>(props);
    consumer.subscribe(getConsumerRegisteredTopics());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> record : records){
           processRecord (record)
        }
   }

2) 我读到,zookeeper通过poll(longtimeout)方法调用跟踪活动消费者。如果我使用long.max\u value has timeout in poll(),zookeeper将如何跟踪我的消费者。你能帮我理解Kafka消费者民意测验的行为吗。
提前谢谢。

wn9m85ua

wn9m85ua1#

1) 如果在启动消费者之前没有启动zookeeper和kafka,它将无法连接,但会尝试从kafka读取元数据。我的经验是,kafkaconsumer'poll'调用将在能够连接和读取元数据之前不确定地阻塞。换句话说。。。您的消费者实际上没有连接,但正在等待kafka群集出现。
2) 轮询超时告诉使用者在返回任何数据之前要等待多长时间。您必须确保在poll返回后,您再次调用poll足够快,以便您的消费者保持活跃。为轮询调用指定的超时与kafkaconsumer的keepalive机制无关(这由使用者属性的session.timeout.ms属性控制)。

相关问题