Kafka只订阅最新消息?

jgzswidk  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(446)

有时(似乎很随机)Kafka会发送旧信息。我只想要最新的消息,这样它就可以用相同的密钥覆盖消息。目前看来,我有多个消息与同一个键它没有得到压缩。
我在主题中使用此设置:

cleanup.policy=compact

我使用的是java/kotlin和apachekafka1.1.1客户端。

Properties(8).apply {
    val jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"
    val jaasCfg = String.format(jaasTemplate, Configuration.kafkaUsername, Configuration.kafkaPassword)
    put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS)
    put(ConsumerConfig.GROUP_ID_CONFIG,
            "ApiKafkaKotlinConsumer${Configuration.kafkaGroupId}")
    put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.java.name)
    put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.java.name)

    put("security.protocol", "SASL_SSL")
    put("sasl.mechanism", "SCRAM-SHA-256")
    put("sasl.jaas.config", jaasCfg)
    put("max.poll.records", 100)
    put("receive.buffer.bytes", 1000000)
}

我错过了一些设置吗?

46scxncf

46scxncf1#

如果希望每个键只有一个值,则必须使用 KTable<K,V> 抽象: StreamsBuilder::table(final String topic) 来自Kafka河。此处使用的主题应将清除策略设置为 compact .
如果你使用kafkaconsumer,你只需要从代理那里获取数据。它没有提供任何执行某种重复数据消除的机制。根据是否执行了压缩,您可以为同一密钥获得1到n条消息。
关于压实
压缩并不意味着立即删除同一个键的所有旧值。什么时候 old 同一密钥的消息将被删除,具体取决于多个属性。最重要的是: log.cleaner.min.cleanable.ratio 一个日志的脏日志与总日志的最小比率,即符合清除条件的日志 log.cleaner.min.compaction.lag.ms 消息在日志中保持未压缩状态的最短时间。仅适用于正在压实的原木。 log.cleaner.enable 启用日志清理器进程在服务器上运行。如果将任何主题与cleanup.policy=compact(包括内部偏移量主题)一起使用,则应启用。如果禁用这些主题,这些主题将不会被压缩并不断增大。
你可以找到更多关于压缩的细节https://kafka.apache.org/documentation/#compaction

相关问题