我需要在晚上执行一个作业,它将获取kafka队列中的所有消息,并对它们执行一个进程。我可以得到消息,但Kafka流正在等待更多的消息,我无法继续我的进程。我有以下代码:
...
private ConsumerConnector consumerConnector;
private final static String TOPIC = "test";
public MessageStreamConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "test-group");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
public List<String> getMessages() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
List<String> messages = new ArrayList<>();
while (it.hasNext())
messages.add(new String(it.next().message()));
return messages;
}
代码能够获取消息,但当它处理最后一条消息时,它将保留在行中:
while (it.hasNext())
问题是,我怎样才能从Kafka那里得到所有的信息,停止流并继续我的其他任务。
我希望你能帮助我
谢谢
3条答案
按热度按时间yjghlzjz1#
我目前正在使用kafka 0.10.0.1进行开发,发现了有关使用consumer property auto.offset.reset的混合信息,所以我做了一些实验来找出实际发生的情况。
基于这些,我现在这样理解:当设置属性时:
这会将使用者定位到分配的分区中的第一条可用消息(在分区上未进行任何提交时),或者将使用者定位到最后提交的分区偏移量(请注意,您总是提交上次读取偏移量+1,否则您将在每次重新启动时重新读取最后提交的消息)消费者)
或者,不设置auto.offset.reset,这意味着将使用默认值“latest”。
在这种情况下,您不会收到任何有关连接使用者的旧消息-只会收到连接使用者后发布到主题的消息。
最后,如果要确保接收某个主题和指定分区的所有可用消息,则必须调用seektobeginning()。
似乎建议首先调用poll(0l),以确保您的使用者获得分配的分区(或者在ConsumerBalanceListener中实现您的代码!),然后将每个分配的分区搜索到“开始”:
57hvy0tb2#
像这样的办法也许行得通。基本上,这个想法是使用Kafka消费者和民意调查,直到你得到一些记录,然后停止时,你得到一个空批。
ekqde3dh3#
Kafka流似乎从一开始就不支持消费。
您可以创建一个本地kafka消费者并设置
auto.offset.reset
到最早,它将从一开始就消耗消息。