我在springboot中实现了一个端点,当调用它时,它将转储kafka主题中的所有消息(用于测试)。
我期望的行为是,当生产者写入“testtopic”主题,然后消费者进行投票时,它应该读取刚刚生成的消息。
我观察到的行为是,消费者未能消费生成的消息。此外,如果生产者产生更多的消息(比如10-15条),那么消费者将一次性地转储所有消息。从这一点开始,如果生产者甚至产生一条消息,那么消费者将按预期消费。
直觉上我认为 FETCH_MIN_BYTES_CONFIG
可能与此有关-可能消费者正在等待写入足够的字节。但是这已经设置为1字节(默认值),并且不能解释随后成功的单个消息读取。
接下来,我想可能是在创建主题之前注册了消费者(通过太快调用注册端点)。但我证实了 kafka-topics.sh
该主题在注册使用者之前存在。
我注意到,如果启用偏移量的autocommit,那么行为有时是预期的,有时不是。对于手动提交偏移量(下面的代码中没有显示),如上所述,这种行为非常奇怪。
我也知道制作人通过使用 kafka-console-consumer
.
还尝试将轮询超时增加到1秒,但没有成功。
// Consumer
@Component
public class TestConsumer{
private KafkaConsumer testConsumer = null;
public void registerConsumer(final String consumerId) {
if (consumer == null) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<some_address>:<some_port>");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
testConsumer = new KafkaConsumer<String, String>(props);
testConsumer.subscribe(Collections.singletonList("testTopic"));
}
else{
logger.debug("Consumer already registered");
}
}
public Map<String, List<String>> consume() {
Map<String, List<String>> messages = new HashMap<>();
if (testConsumer == null){
logger.error("testConsumer was not instantiated");
return null;
}
ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(100));
List<String> buffer = new ArrayList<>();
for (ConsumerRecord<String, String> record: records){
logger.debug(String.format("Consuming %s", record.value()));
buffer.add(record.value());
}
messages.put("data", buffer);
return messages;
}
}
步骤的顺序是:1。 Spring 启动应用程序启动2。Kafka主题已创建,我可以通过Kafka控制台3确认。我注册了生产者和消费者4。生产者生产,我可以确认这与Kafka控制台(不同的消费群体)5。消费者无法消费
我预计结果如下:
{
"data" : ["message1"]
}
我得到的是
{
"data" : []
}
你知道为什么消费者在写了一定数量的消息之后才消费记录吗?
编辑\u 1:添加 props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
财产对消费者没有影响。
1条答案
按热度按时间fkvaft9z1#
当您手动调用
testConsumer.poll(Duration.ofMillis(100))
. 你需要不断地从这个主题集中注意力。就像一个无限的while循环。如:看看这个链接:Kafka消费者