我们最近开始使用kafka,我正在使用kafkajava本机消费者api编写一个kafka消费者应用程序。
不过,我看到的大多数示例都使用 while
循环然后调用 poll
方法对循环中的使用者对象执行。如下所示:
while (true) {
final ConsumerRecords<Long, String> consumerRecords =
consumer.poll(1000);
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
record.key(), record.value(),
record.partition(), record.offset());
});
consumer.commitAsync();
}
我只是在寻找一种更好的方法来实现这一点,而无需使用本机java消费者api进行循环。我知道用SpringKafka你不需要写这些。使用本机api怎么样?有什么好方法或最佳实践吗?
2条答案
按热度按时间kx5bkwkv1#
持续的投票“只是”Kafka消费者的工作方式,Kafka协议下的工作方式。每个操作总是由客户机(拉模型)发起,而不是由代理(推模型)发起,在使用消息的情况下,代理将转换为轮询。使用javakafkaconsumerapi意味着拥有一个循环,使用一个调度器,或者使用java的任何技术来连续地执行代码,您必须处理它。其他框架,如spring或smallryeReact式消息传递,就是为您做的。他们将轮询循环隐藏到您的应用程序中,但最终总会有一个循环。。。Kafka就是这样工作的。
hs1rzwqc2#
我试着使用一个调度程序和代码工作。
}