当没有消费者连接时,kafka代理能保留消息吗?

lymgl2op  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(495)

我正在尝试构建一个pub/sub应用程序,我正在探索最好的工具。我目前正在看Kafka和有一个小演示应用程序已经运行。然而,我遇到了一个概念上的问题。
我有一个生产者(java代码):

String topicName = "MyTopic;
    String key = "MyKey";

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092,localhost:9093");
    props.put("acks", "all");
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    Producer<String, byte[]> producer = new KafkaProducer <String, byte[]>(props);

    byte[] data = <FROM ELSEWHERE>;
    ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(topicName, key, data);

    try {
        RecordMetadata result = producer.send(record).get();
    }
    catch (Exception e) {
        // Nothing for now
    }
    producer.close();

当我通过kakfa命令行工具启动消费者时:

kafka-console-consumer --bootstrap-server localhost:9092 --topic MyTopic

然后我执行生产者代码,我看到数据信息显示在我的消费终端上。
但是,如果在执行producer之前没有运行consumer,则消息将显示为“lost”。当我启动消费者时(在执行生产者之后),消费者终端中没有显示任何内容。
有人知道在没有消费者连接的情况下,kafka代理是否可以保留消息吗?如果是,怎么做?

bz4sfanl

bz4sfanl1#

追加 --from-beginning 到console consumer命令,使其从最早的偏移量开始使用。这实际上是关于由配置控制的偏移重置策略 auto.offset.reset . 以下是此配置的含义:
如果kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办: earliest :自动将偏移量重置为最早的偏移量 latest :自动将偏移重置为最新偏移 none :throw exception to the consumer if no previous offset is found for the consumer's group anything:throw exception to the consumer.如果没有为使用者的组找到以前的偏移量,则向使用者抛出异常。其他:向使用者抛出异常。

相关问题