我正在尝试构建一个pub/sub应用程序,我正在探索最好的工具。我目前正在看Kafka和有一个小演示应用程序已经运行。然而,我遇到了一个概念上的问题。
我有一个生产者(java代码):
String topicName = "MyTopic;
String key = "MyKey";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("acks", "all");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Producer<String, byte[]> producer = new KafkaProducer <String, byte[]>(props);
byte[] data = <FROM ELSEWHERE>;
ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(topicName, key, data);
try {
RecordMetadata result = producer.send(record).get();
}
catch (Exception e) {
// Nothing for now
}
producer.close();
当我通过kakfa命令行工具启动消费者时:
kafka-console-consumer --bootstrap-server localhost:9092 --topic MyTopic
然后我执行生产者代码,我看到数据信息显示在我的消费终端上。
但是,如果在执行producer之前没有运行consumer,则消息将显示为“lost”。当我启动消费者时(在执行生产者之后),消费者终端中没有显示任何内容。
有人知道在没有消费者连接的情况下,kafka代理是否可以保留消息吗?如果是,怎么做?
1条答案
按热度按时间bz4sfanl1#
追加
--from-beginning
到console consumer命令,使其从最早的偏移量开始使用。这实际上是关于由配置控制的偏移重置策略auto.offset.reset
. 以下是此配置的含义:如果kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办:
earliest
:自动将偏移量重置为最早的偏移量latest
:自动将偏移重置为最新偏移none
:throw exception to the consumer if no previous offset is found for the consumer's group anything:throw exception to the consumer.如果没有为使用者的组找到以前的偏移量,则向使用者抛出异常。其他:向使用者抛出异常。