最近,在使用kafka时,我的应用程序需要从一开始就访问一个主题中的所有消息。因此,在编写kafka消费者(使用javaapi)时,我能够从头开始读取消息,但它只返回主题中的前500条消息。试图增加
props.put(consumerconfig.max\u poll\u records\u config,integer.max\u value);props.put(consumerconfig.fetch\u max\u bytes\u config,long.max\u value);
但它仍然不会返回所有消息,而在使用cli命令时,
kafka控制台使用者--引导服务器localhost:9092 --topic --从一开始
它会返回我所有的5000条记录。
伙计们,有什么配置丢失了吗?任何帮助都是值得感激的。。
[编辑:1]
消费者代码。
public ConsumerRecords<byte[], byte[]> pullFromKafka(String topicname, Map<String, Object> props) {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
consumer.subscribe(new ArrayList<String>(Collections.singletonList(topicname)));
consumer.poll(0);
// Reading topic offset from beginning
consumer.seekToBeginning(consumer.assignment());
// poll and time-out if no replies
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
consumer.close();
return records;
}
然而,我改变了消费者:
public Map<String, byte[]> pullFromKafka(String topicname, Map<String, Object> props) {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
Map<String, byte[]> entityMap = new HashMap<String, byte[]>();
boolean stop = false;
consumer.subscribe(new ArrayList<String>(Collections.singletonList(topicname)));
consumer.poll(0);
// Reading topic offset from beginning
consumer.seekToBeginning(consumer.assignment());
while (!stop) {
// Request unread messages from the topic.
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(1000);
Iterator<ConsumerRecord<byte[], byte[]>> iterator = consumerRecords.iterator();
if (iterator.hasNext()) {
while (iterator.hasNext()) {
ConsumerRecord<byte[], byte[]> record = iterator.next();
// Iterate through returned records, extract the value
// of each message, and print the value to standard output.
entityMap.put(new String(record.key()), record.value());
}
} else {
stop = true;
}
}
return entityMap;
}
虽然现在它正在获取所有记录,但我想知道是否有更好的方法。
1条答案
按热度按时间yzuktlbb1#
使用没有错
seekToBeginning()
使用所有消息。然而,有一种更灵活的方法可以达到同样的效果。您可以通过配置来实现这一点,这允许您从开始到结束都使用相同的代码。这也是方法
kafka-console-consumer.sh
工具用途:套
auto.offset.reset
至earliest
套group.id
新的/随机值。如果你不想跟踪这个消费者的立场,但总是想从一开始,你也可以设置enable.auto.commit
为避免污染主题,请输入false。删除
seekToBeginning()
根据你的逻辑关于你的逻辑,你应该考虑以下几点:
有些情况下
poll()
可以返回空集合,即使它尚未到达结尾。还有一个主题是流(无界),结尾可以移动。不管怎样你都可以用endOffsets()
查找当前结束偏移量并将其与返回的消息的偏移量进行比较你可能不想投票,直到你到达终点。一个主题的大小可以是几GB,并且包含数百万条记录。将所有内容存储在Map中很容易导致内存不足问题。