我在Kafka中用 kafka-topic.sh
并用java客户端进行了测试:
kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 2 \
--topic my-topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"), new LoggingConsumerRebalanceListener(RandomStringUtils.randomAlphanumeric(3).toLowerCase()));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
Thread.sleep(500);
}
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String key = Integer.toString(i+1);
String value = RandomStringUtils.randomAlphabetic(100);
LOGGER.info("Sending message {}", key);
producer.send(new ProducerRecord<String, String>("my-topic", key, value));
Thread.sleep(100);
}
producer.close();
生产者和消费者是我独立启动的独立代码块。
我有一个观察者,下面的代码按顺序正常工作:
设置主题
运行消费者
运行生产者
运行生产者。。。
但是,按照顺序:
设置主题
运行生产者(1)
运行消费者
运行生产者
生产者第一次运行的消息丢失。稍后,如果我停止consumer,运行producer,运行consumer,我将得到所有消息。只有在第一个使用者订阅之前生成的消息才会丢失。尽管我已经在命令行中明确地创建了主题。
我做错什么了?如何防止信息丢失?
1条答案
按热度按时间fcipmucu1#
默认情况下,使用者将读取最新的偏移量。
如果运行“producer(1)”,然后启动consumer,它将忽略来自该producer的消息,并等待第二个producer调用生成的新消息。
可以通过配置auto.offset.reset更改从最新偏移量读取的行为。
稍后,如果我停止consumer,运行producer,运行consumer,我会收到所有消息
这是因为您的使用者有一个固定的consumergroup(configuration group.id),并且默认设置auto.offset.reset不再有任何影响,因为该组已向kafka注册,使用者将继续从其停止的主题中读取。
最后,如果您想在运行第二个序列时不错过任何消息,请设置
auto.offset.reset=earliest
定义一个新的group.id
.