在第一个消费者连接之前生成的消息丢失

atmip9wb  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(341)

我在Kafka中用 kafka-topic.sh 并用java客户端进行了测试:

  1. kafka-topics.sh \
  2. --create \
  3. --zookeeper localhost:2181 \
  4. --replication-factor 1 \
  5. --partitions 2 \
  6. --topic my-topic
  7. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  8. consumer.subscribe(Arrays.asList("my-topic"), new LoggingConsumerRebalanceListener(RandomStringUtils.randomAlphanumeric(3).toLowerCase()));
  9. while (true) {
  10. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
  11. for (ConsumerRecord<String, String> record : records)
  12. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  13. Thread.sleep(500);
  14. }
  15. Producer<String, String> producer = new KafkaProducer<>(props);
  16. for (int i = 0; i < 10; i++) {
  17. String key = Integer.toString(i+1);
  18. String value = RandomStringUtils.randomAlphabetic(100);
  19. LOGGER.info("Sending message {}", key);
  20. producer.send(new ProducerRecord<String, String>("my-topic", key, value));
  21. Thread.sleep(100);
  22. }
  23. producer.close();

生产者和消费者是我独立启动的独立代码块。
我有一个观察者,下面的代码按顺序正常工作:
设置主题
运行消费者
运行生产者
运行生产者。。。
但是,按照顺序:
设置主题
运行生产者(1)
运行消费者
运行生产者
生产者第一次运行的消息丢失。稍后,如果我停止consumer,运行producer,运行consumer,我将得到所有消息。只有在第一个使用者订阅之前生成的消息才会丢失。尽管我已经在命令行中明确地创建了主题。
我做错什么了?如何防止信息丢失?

fcipmucu

fcipmucu1#

默认情况下,使用者将读取最新的偏移量。
如果运行“producer(1)”,然后启动consumer,它将忽略来自该producer的消息,并等待第二个producer调用生成的新消息。
可以通过配置auto.offset.reset更改从最新偏移量读取的行为。
稍后,如果我停止consumer,运行producer,运行consumer,我会收到所有消息
这是因为您的使用者有一个固定的consumergroup(configuration group.id),并且默认设置auto.offset.reset不再有任何影响,因为该组已向kafka注册,使用者将继续从其停止的主题中读取。
最后,如果您想在运行第二个序列时不错过任何消息,请设置 auto.offset.reset=earliest 定义一个新的 group.id .

相关问题