有时(似乎很随机)Kafka会发送旧信息。我只想要最新的消息,这样它就可以用相同的密钥覆盖消息。目前看来,我有多个消息与同一个键它没有得到压缩。
我在主题中使用此设置:
cleanup.policy=compact
我使用的是java/kotlin和apachekafka1.1.1客户端。
Properties(8).apply {
val jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"
val jaasCfg = String.format(jaasTemplate, Configuration.kafkaUsername, Configuration.kafkaPassword)
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS)
put(ConsumerConfig.GROUP_ID_CONFIG,
"ApiKafkaKotlinConsumer${Configuration.kafkaGroupId}")
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer::class.java.name)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer::class.java.name)
put("security.protocol", "SASL_SSL")
put("sasl.mechanism", "SCRAM-SHA-256")
put("sasl.jaas.config", jaasCfg)
put("max.poll.records", 100)
put("receive.buffer.bytes", 1000000)
}
我错过了一些设置吗?
1条答案
按热度按时间46scxncf1#
如果希望每个键只有一个值,则必须使用
KTable<K,V>
抽象:StreamsBuilder::table(final String topic)
来自Kafka河。此处使用的主题应将清除策略设置为compact
.如果你使用kafkaconsumer,你只需要从代理那里获取数据。它没有提供任何执行某种重复数据消除的机制。根据是否执行了压缩,您可以为同一密钥获得1到n条消息。
关于压实
压缩并不意味着立即删除同一个键的所有旧值。什么时候
old
同一密钥的消息将被删除,具体取决于多个属性。最重要的是:log.cleaner.min.cleanable.ratio
一个日志的脏日志与总日志的最小比率,即符合清除条件的日志log.cleaner.min.compaction.lag.ms
消息在日志中保持未压缩状态的最短时间。仅适用于正在压实的原木。log.cleaner.enable
启用日志清理器进程在服务器上运行。如果将任何主题与cleanup.policy=compact(包括内部偏移量主题)一起使用,则应启用。如果禁用这些主题,这些主题将不会被压缩并不断增大。你可以找到更多关于压缩的细节https://kafka.apache.org/documentation/#compaction