Kafka高级使用者使用Java API从topic中获取所有消息(等效于--from-begin)

omqzjyyz  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(409)

我正在使用kafka站点的consumergroupexample代码测试kafka高级消费者。我想检索kafka服务器配置中关于“test”主题的所有现有消息。查看其他博客,auto.offset.reset应设置为“最小”,以便能够获取所有消息:

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId)    {
    Properties props = new Properties();
    props.put("zookeeper.connect", a_zookeeper);
    props.put("group.id", a_groupId);
    props.put("auto.offset.reset", "smallest");
    props.put("zookeeper.session.timeout.ms", "10000");     

    return new ConsumerConfig(props);
}

我真正的问题是:对于高级使用者,什么是等价的java api调用,它等价于:
bin/kafka-console-consumer.sh--缩放器localhost:2181 --topic 测试--从头开始

aamkag61

aamkag611#

要从头获取消息,可以执行以下操作:

import kafka.utils.ZkUtils;
ZkUtils.maybeDeletePath("zkhost:zkport", "/consumers/group.id");

那就照常工作吧。。。

pxyaymoc

pxyaymoc2#

基本上,每当一个新的消费者试图消费一个主题时,他都会从头开始阅读信息。如果您每次都是为了测试而从头开始消费,那么每次您使用新的groupid初始化您的消费者时,它都会从头开始读取消息。我是这样做的:

properties.put("group.id", UUID.randomUUID().toString());

每次都要从头开始读!

pieyvz9o

pieyvz9o3#

看起来您需要使用“低级simpleconsumer api”
对于大多数应用程序,高级使用者api已经足够好了。一些应用程序希望功能尚未向高级使用者公开(例如,在重新启动使用者时设置初始偏移量)。他们可以使用我们的低级simpleconsumerapi。逻辑会更复杂一些,你可以按照这里的例子。
此示例用于从具有以下参数的主题获取所有消息:(请注意,端口是kafka端口,而不是zookeeper端口,此示例中设置的主题):

10 my-replicated-topic 0 localhost 9092

具体来说,有一个获取readoffset的方法接受kafka.api.offsetrequest.earliesttime():

long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

下面是另一篇文章,可能会提供一些关于如何解决这个问题的其他想法:如何从Kafka的旧偏移点获取数据?

yhqotfr8

yhqotfr84#

Properties props = new Properties(); 
 props.put("bootstrap.servers", "localhost:9092");
 props.put("auto.offset.reset", "earliest");
 props.put("group.id", UUID.randomUUID().toString());

这些属性将帮助您。

相关问题