我们刚开始在我们的项目中使用Kafka。我们使用的是Kafka2.11-0.9.0.0。我有一些关于Kafka消费者的问题。
1) 在启动zookeeper和kafka服务器之前,我启动了kafka consumer,但我的kafkaconsumer客户端仍然能够连接。我有以下代码行
Consumer<String, String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(getConsumerRegisteredTopics());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records){
processRecord (record)
}
}
2) 我读到,zookeeper通过poll(longtimeout)方法调用跟踪活动消费者。如果我使用long.max\u value has timeout in poll(),zookeeper将如何跟踪我的消费者。你能帮我理解Kafka消费者民意测验的行为吗。
提前谢谢。
1条答案
按热度按时间wn9m85ua1#
1) 如果在启动消费者之前没有启动zookeeper和kafka,它将无法连接,但会尝试从kafka读取元数据。我的经验是,kafkaconsumer'poll'调用将在能够连接和读取元数据之前不确定地阻塞。换句话说。。。您的消费者实际上没有连接,但正在等待kafka群集出现。
2) 轮询超时告诉使用者在返回任何数据之前要等待多长时间。您必须确保在poll返回后,您再次调用poll足够快,以便您的消费者保持活跃。为轮询调用指定的超时与kafkaconsumer的keepalive机制无关(这由使用者属性的session.timeout.ms属性控制)。