我正在使用kafka站点的consumergroupexample代码测试kafka高级消费者。我想检索kafka服务器配置中关于“test”主题的所有现有消息。查看其他博客,auto.offset.reset应设置为“最小”,以便能够获取所有消息:
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}
我真正的问题是:对于高级使用者,什么是等价的java api调用,它等价于:
bin/kafka-console-consumer.sh--缩放器localhost:2181 --topic 测试--从头开始
4条答案
按热度按时间aamkag611#
要从头获取消息,可以执行以下操作:
那就照常工作吧。。。
pxyaymoc2#
基本上,每当一个新的消费者试图消费一个主题时,他都会从头开始阅读信息。如果您每次都是为了测试而从头开始消费,那么每次您使用新的groupid初始化您的消费者时,它都会从头开始读取消息。我是这样做的:
每次都要从头开始读!
pieyvz9o3#
看起来您需要使用“低级simpleconsumer api”
对于大多数应用程序,高级使用者api已经足够好了。一些应用程序希望功能尚未向高级使用者公开(例如,在重新启动使用者时设置初始偏移量)。他们可以使用我们的低级simpleconsumerapi。逻辑会更复杂一些,你可以按照这里的例子。
此示例用于从具有以下参数的主题获取所有消息:(请注意,端口是kafka端口,而不是zookeeper端口,此示例中设置的主题):
具体来说,有一个获取readoffset的方法接受kafka.api.offsetrequest.earliesttime():
下面是另一篇文章,可能会提供一些关于如何解决这个问题的其他想法:如何从Kafka的旧偏移点获取数据?
yhqotfr84#
这些属性将帮助您。