kafka消费者只在产生“足够”的数据之后才读取

dauxcl2d  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(409)

我在springboot中实现了一个端点,当调用它时,它将转储kafka主题中的所有消息(用于测试)。
我期望的行为是,当生产者写入“testtopic”主题,然后消费者进行投票时,它应该读取刚刚生成的消息。
我观察到的行为是,消费者未能消费生成的消息。此外,如果生产者产生更多的消息(比如10-15条),那么消费者将一次性地转储所有消息。从这一点开始,如果生产者甚至产生一条消息,那么消费者将按预期消费。
直觉上我认为 FETCH_MIN_BYTES_CONFIG 可能与此有关-可能消费者正在等待写入足够的字节。但是这已经设置为1字节(默认值),并且不能解释随后成功的单个消息读取。
接下来,我想可能是在创建主题之前注册了消费者(通过太快调用注册端点)。但我证实了 kafka-topics.sh 该主题在注册使用者之前存在。
我注意到,如果启用偏移量的autocommit,那么行为有时是预期的,有时不是。对于手动提交偏移量(下面的代码中没有显示),如上所述,这种行为非常奇怪。
我也知道制作人通过使用 kafka-console-consumer .
还尝试将轮询超时增加到1秒,但没有成功。

  1. // Consumer
  2. @Component
  3. public class TestConsumer{
  4. private KafkaConsumer testConsumer = null;
  5. public void registerConsumer(final String consumerId) {
  6. if (consumer == null) {
  7. Properties props = new Properties();
  8. props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<some_address>:<some_port>");
  9. props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  10. props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  11. props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
  12. testConsumer = new KafkaConsumer<String, String>(props);
  13. testConsumer.subscribe(Collections.singletonList("testTopic"));
  14. }
  15. else{
  16. logger.debug("Consumer already registered");
  17. }
  18. }
  19. public Map<String, List<String>> consume() {
  20. Map<String, List<String>> messages = new HashMap<>();
  21. if (testConsumer == null){
  22. logger.error("testConsumer was not instantiated");
  23. return null;
  24. }
  25. ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(100));
  26. List<String> buffer = new ArrayList<>();
  27. for (ConsumerRecord<String, String> record: records){
  28. logger.debug(String.format("Consuming %s", record.value()));
  29. buffer.add(record.value());
  30. }
  31. messages.put("data", buffer);
  32. return messages;
  33. }
  34. }

步骤的顺序是:1。 Spring 启动应用程序启动2。Kafka主题已创建,我可以通过Kafka控制台3确认。我注册了生产者和消费者4。生产者生产,我可以确认这与Kafka控制台(不同的消费群体)5。消费者无法消费
我预计结果如下:

  1. {
  2. "data" : ["message1"]
  3. }

我得到的是

  1. {
  2. "data" : []
  3. }

你知道为什么消费者在写了一定数量的消息之后才消费记录吗?
编辑\u 1:添加 props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 财产对消费者没有影响。

fkvaft9z

fkvaft9z1#

当您手动调用 testConsumer.poll(Duration.ofMillis(100)) . 你需要不断地从这个主题集中注意力。就像一个无限的while循环。如:

  1. while (true) {
  2. Map records = consume();
  3. logger.debug("received records: {}", records);
  4. }

看看这个链接:Kafka消费者

相关问题